In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
from pyspark import SparkConf
from pyspark.sql.functions import col, row_number, countDistinct, lit, sum, avg, row_number, md5, sha2, when
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DateType, FloatType, DecimalType
from pyspark.sql import SQLContext
from pyspark.sql import udf


In [2]:
sqlContext = SQLContext(sc)
hive_context = HiveContext(sc)
spark = SparkSession \
    .builder \
    .appName("EJERCICIO SPARK") \
    .enableHiveSupport() \
    .getOrCreate()


In [3]:
parquet_products_path = "/home/training/data/products/part-00000-tid-2452134987220853249-b7b7ad2c-5a72-4006-a202-614e63e70c99-5-1-c000.snappy.parquet"
parquet_sellers_path = "/home/training/data/sellers/part-00000-tid-6580020465882417462-624fcc11-67f0-4cff-abb7-804016c31f95-218-1-c000.snappy.parquet"
parquet_sales_path = "/home/training/data/sales/part-00000-tid-8977413855564310585-22625781-9ad7-4f0e-b3ed-fb62a0788d8f-227-1-c000.snappy.parquet"

In [4]:
df_products = spark.read.parquet("file://" + parquet_products_path)
df_sellers = spark.read.parquet("file://" + parquet_sellers_path)
df_sales = spark.read.parquet("file://" + parquet_sales_path)

In [5]:
df_products.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         0|   product_0|   22|
|         1|   product_1|   30|
|         2|   product_2|   91|
|         3|   product_3|   37|
|         4|   product_4|  145|
|         5|   product_5|  128|
|         6|   product_6|   66|
|         7|   product_7|  145|
|         8|   product_8|   51|
|         9|   product_9|   44|
|        10|  product_10|   53|
|        11|  product_11|   13|
|        12|  product_12|  104|
|        13|  product_13|  102|
|        14|  product_14|   24|
|        15|  product_15|   14|
|        16|  product_16|   38|
|        17|  product_17|   72|
|        18|  product_18|   16|
|        19|  product_19|   46|
+----------+------------+-----+
only showing top 20 rows



In [6]:
df_sellers.show()

+---------+-----------+------------+
|seller_id|seller_name|daily_target|
+---------+-----------+------------+
|        0|   seller_0|     2500000|
|        1|   seller_1|      806542|
|        2|   seller_2|      623640|
|        3|   seller_3|     1687106|
|        4|   seller_4|       27991|
|        5|   seller_5|     1652820|
|        6|   seller_6|      179508|
|        7|   seller_7|      401090|
|        8|   seller_8|      670016|
|        9|   seller_9|     1203705|
+---------+-----------+------------+



In [7]:
df_sales.show()

+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|       1|         0|        0|2020-07-06|             64|pqctnxzofwmtwlpsz...|
|       2|         0|        0|2020-07-03|             84|olapxfcjktuajsuwp...|
|       3|         0|        0|2020-07-09|             86|ksrwojmeqpfipdaed...|
|       4|         0|        0|2020-07-09|             71|gpfifiykcpiwfftvo...|
|       5|         0|        0|2020-07-02|             90|qafhecifvdljsflyj...|
|       6|         0|        0|2020-07-08|             43|zxhorgqqououyqnmn...|
|       7|         0|        0|2020-07-08|              7|uypazquoqxxevgqff...|
|       8|         0|        0|2020-07-09|             68|wwchozldqkostuang...|
|       9|         0|        0|2020-07-08|             40|xglmmsgbiuclmwkpn...|
|      10|         0|        0|2020-07-0

In [8]:
df_products.printSchema()
df_sellers.printSchema()
df_sales.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)

root
 |-- seller_id: string (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: string (nullable = true)

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)



In [9]:
df_products = df_products.select(col('product_id').cast(IntegerType()),col('product_name'),col('price').cast(DecimalType()))
df_sellers = df_sellers.select(col('seller_id').cast(IntegerType()),col('seller_name'),col('daily_target').cast(IntegerType()))
df_sales = df_sales.select(col('order_id').cast(IntegerType()),col('product_id').cast(IntegerType()),col('seller_id').cast(IntegerType()),col('date').cast(DateType()),col('num_pieces_sold').cast(IntegerType()),col('bill_raw_text'))

In [10]:
df_products.printSchema()
df_sellers.printSchema()
df_sales.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: decimal(10,0) (nullable = true)

root
 |-- seller_id: integer (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: integer (nullable = true)

root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- seller_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- num_pieces_sold: integer (nullable = true)
 |-- bill_raw_text: string (nullable = true)



### 1.a Averigüe cuántos pedidos, cuántos productos y cuántos vendedores hay en los datos. 

In [11]:
df_products.count()

50000

In [12]:
df_sellers.count()

10

In [13]:
df_sales.count()

400040

In [14]:
df_products.createOrReplaceTempView("df_products")
nproductos = spark.sql("SELECT count(*) as cantidad_de_productos from df_products")
nproductos.show()

+---------------------+
|cantidad_de_productos|
+---------------------+
|                50000|
+---------------------+



In [15]:
df_sellers.createOrReplaceTempView("df_sellers")
nproductos = spark.sql("SELECT count(*) as cantidad_de_vendedores from df_sellers")
nproductos.show()

+----------------------+
|cantidad_de_vendedores|
+----------------------+
|                    10|
+----------------------+



In [16]:
df_sales.createOrReplaceTempView("df_sales")
nproductos = spark.sql("SELECT count(*) as cantidad_de_pedidos from df_sales")
nproductos.show()

+-------------------+
|cantidad_de_pedidos|
+-------------------+
|             400040|
+-------------------+



### 1.b ¿Cuántos productos se han vendido al menos una vez?

In [18]:
sales_distinct = df_sales.select(countDistinct("product_id"))
sales_distinct.show()

#df_sales.select("product_id").distinct().count()

+--------------------------+
|count(DISTINCT product_id)|
+--------------------------+
|                     16515|
+--------------------------+



In [19]:
products_distinct = spark.sql("select count(distinct(product_id)) as productos_unicos_vendidos from df_sales ")
products_distinct.show()

+-------------------------+
|productos_unicos_vendidos|
+-------------------------+
|                    16515|
+-------------------------+



### 1.c ¿Cuál es el producto contenido en más pedidos?

In [20]:
df_sales.groupBy('product_id').count().orderBy(col('count').desc()).show(1)

+----------+------+
|product_id| count|
+----------+------+
|         0|380000|
+----------+------+
only showing top 1 row



In [21]:
prod_mas_vendido = spark.sql("select product_id, count(*) as cantidad_vendida \
                                from df_sales \
                                group by product_id \
                                order by cantidad_vendida desc\
                                limit 1")
prod_mas_vendido.show()

+----------+----------------+
|product_id|cantidad_vendida|
+----------+----------------+
|         0|          380000|
+----------+----------------+



### 2. ¿Cuántos productos distintos se han vendido en cada día?

In [22]:
df_dist_day = df_sales.groupBy("date")\
                        .agg(countDistinct("product_id").alias("cantidad de productos distintos"))\
                        .orderBy("date")
df_dist_day.show()

+----------+-------------------------------+
|      date|cantidad de productos distintos|
+----------+-------------------------------+
|2020-07-01|                           1997|
|2020-07-02|                           1942|
|2020-07-03|                           1908|
|2020-07-04|                           2001|
|2020-07-05|                           1976|
|2020-07-06|                           1977|
|2020-07-07|                           1903|
|2020-07-08|                           2004|
|2020-07-09|                           2005|
|2020-07-10|                           1968|
+----------+-------------------------------+



In [23]:
prod_distinct_day = spark.sql("select date, count(distinct(product_id)) as cantidad_productos_unicos\
                                from df_sales\
                                group by date\
                                order by date")
prod_distinct_day.show()

+----------+-------------------------+
|      date|cantidad_productos_unicos|
+----------+-------------------------+
|2020-07-01|                     1997|
|2020-07-02|                     1942|
|2020-07-03|                     1908|
|2020-07-04|                     2001|
|2020-07-05|                     1976|
|2020-07-06|                     1977|
|2020-07-07|                     1903|
|2020-07-08|                     2004|
|2020-07-09|                     2005|
|2020-07-10|                     1968|
+----------+-------------------------+



### 3. ¿Cuál es el ingreso promedio de los pedidos? 

In [24]:
#join dataframes df_sales and df_products
df_sales_prod = df_sales.join(df_products,["product_id"])
## add subtotal column
df_sales_prod = df_sales_prod.select(
    "order_id",
    "product_id",
    "num_pieces_sold",
    "price",
    F.lit(df_sales_prod.num_pieces_sold * df_sales_prod.price).alias("subtotal")
)
df_sales_prod.show()

+--------+----------+---------------+-----+--------+
|order_id|product_id|num_pieces_sold|price|subtotal|
+--------+----------+---------------+-----+--------+
|       1|         0|             64|   22|    1408|
|       2|         0|             84|   22|    1848|
|       3|         0|             86|   22|    1892|
|       4|         0|             71|   22|    1562|
|       5|         0|             90|   22|    1980|
|       6|         0|             43|   22|     946|
|       7|         0|              7|   22|     154|
|       8|         0|             68|   22|    1496|
|       9|         0|             40|   22|     880|
|      10|         0|             78|   22|    1716|
|      11|         0|             44|   22|     968|
|      12|         0|              7|   22|     154|
|      13|         0|             52|   22|    1144|
|      14|         0|             76|   22|    1672|
|      15|         0|             38|   22|     836|
|      16|         0|             74|   22|   

In [25]:
# find average 
#df_sales_prod.agg({'subtotal': 'avg'}).show()
df_sales_prod.select(avg('subtotal')).show()


+-------------+
|avg(subtotal)|
+-------------+
|    1247.0087|
+-------------+



In [26]:
avg_subtotal = spark.sql("select avg(p.price*s.num_pieces_sold) as promedio_ventas\
                            from df_sales s join df_products p \
                            on s.product_id = p.product_id")
avg_subtotal.show()

+---------------+
|promedio_ventas|
+---------------+
|      1247.0087|
+---------------+



### 4. Para cada vendedor, ¿cuál es el porcentaje promedio de contribución de un pedido a la cuota diaria del vendedor? 

In [27]:
df = df_sales.groupby("seller_id","date").agg(sum("num_pieces_sold").alias("sold_by_date")).orderBy("seller_id")
df_4 = df.join(df_sellers.alias("sellers"), ["seller_id"])\
            .withColumn("ratio",col("sold_by_date")/col("daily_target"))\
            .groupby("seller_id").agg(avg("ratio").alias("avg_contribution_perc"))
df_4.show()


+---------+---------------------+
|seller_id|avg_contribution_perc|
+---------+---------------------+
|        0|           0.76832768|
|        1| 0.014035598890076401|
|        2| 0.017860464370470144|
|        3| 0.006408251763671044|
|        4|   0.4040727376656782|
|        5| 0.006950908144867561|
|        6| 0.059841901196604055|
|        7| 0.029155551123189304|
|        8| 0.017612564476072214|
|        9| 0.009113860954303589|
+---------+---------------------+



In [29]:
avg_contribucion = spark.sql("select s.seller_id, avg(s.sum_pieces/v.daily_target) as avg_contribution_perc\
                                from df_sellers v join \
                                    (select seller_id, date, sum(num_pieces_sold) as sum_pieces \
                                    from df_sales \
                                    group by seller_id, date) s\
                                on v.seller_id = s.seller_id\
                                group by s.seller_id\
                                order by s.seller_id"
                            )
avg_contribucion.show()

+---------+---------------------+
|seller_id|avg_contribution_perc|
+---------+---------------------+
|        0|           0.76832768|
|        1| 0.014035598890076401|
|        2| 0.017860464370470144|
|        3| 0.006408251763671044|
|        4|   0.4040727376656782|
|        5| 0.006950908144867561|
|        6| 0.059841901196604055|
|        7| 0.029155551123189304|
|        8| 0.017612564476072214|
|        9| 0.009113860954303589|
+---------+---------------------+



### 5.a ¿Quiénes son las segundas personas que más venden de cada producto?

In [30]:
temp = df_sales.groupBy('product_id','seller_id').agg(sum('num_pieces_sold').alias("n_piezas_vendidas")).orderBy("product_id")

windowPart2 = Window.partitionBy("product_id").orderBy(col("n_piezas_vendidas").desc())
df_5 = temp.withColumn("row",row_number().over(windowPart2)) \
            .where(col("row")==2).select("product_id","seller_id","n_piezas_vendidas","row")

df_5.show()

+----------+---------+-----------------+---+
|product_id|seller_id|n_piezas_vendidas|row|
+----------+---------+-----------------+---+
|        13|        8|                6|  2|
|        18|        8|               22|  2|
|        30|        5|               56|  2|
|        57|        9|               74|  2|
|        65|        7|               22|  2|
|        74|        6|               81|  2|
|        88|        8|               50|  2|
|        99|        7|               71|  2|
|       103|        4|               10|  2|
|       129|        9|               47|  2|
|       136|        8|               41|  2|
|       153|        5|               13|  2|
|       175|        1|               28|  2|
|       194|        1|                4|  2|
|       195|        6|               25|  2|
|       203|        9|               26|  2|
|       228|        3|                6|  2|
|       260|        3|               45|  2|
|       262|        9|               40|  2|
|       30

In [31]:
max_sellers2 = spark.sql("select * from\
                                (select product_id, seller_id, pieces_sold,\
                                row_number() over(partition by product_id order by pieces_sold desc) as row\
                                from (select product_id, seller_id, sum(num_pieces_sold) as pieces_sold\
                                            from df_sales\
                                            group by product_id, seller_id \
                                            order by product_id))\
                        where row = 2")
max_sellers2.show()

+----------+---------+-----------+---+
|product_id|seller_id|pieces_sold|row|
+----------+---------+-----------+---+
|        13|        8|          6|  2|
|        18|        8|         22|  2|
|        30|        5|         56|  2|
|        57|        9|         74|  2|
|        65|        7|         22|  2|
|        74|        6|         81|  2|
|        88|        8|         50|  2|
|        99|        7|         71|  2|
|       103|        4|         10|  2|
|       129|        9|         47|  2|
|       136|        8|         41|  2|
|       153|        5|         13|  2|
|       175|        1|         28|  2|
|       194|        1|          4|  2|
|       195|        6|         25|  2|
|       203|        9|         26|  2|
|       228|        3|          6|  2|
|       260|        3|         45|  2|
|       262|        9|         40|  2|
|       303|        7|          6|  2|
+----------+---------+-----------+---+
only showing top 20 rows



### ¿y las que menos venden (vendedores) de cada producto?

In [32]:
windowPart3 = Window.partitionBy("product_id").orderBy("n_piezas_vendidas")
df_5b = temp.withColumn("row",row_number().over(windowPart3)) \
            .where(col("row")==1).select("product_id","seller_id","n_piezas_vendidas","row")

df_5b.show()

+----------+---------+-----------------+---+
|product_id|seller_id|n_piezas_vendidas|row|
+----------+---------+-----------------+---+
|         0|        0|         19208192|  1|
|         1|        7|               25|  1|
|         2|        1|               28|  1|
|         5|        4|               35|  1|
|         6|        8|               68|  1|
|        12|        8|              100|  1|
|        13|        8|                6|  1|
|        18|        8|               22|  1|
|        19|        7|               51|  1|
|        26|        6|               47|  1|
|        30|        5|               56|  1|
|        34|        2|               42|  1|
|        36|        2|               25|  1|
|        37|        2|               32|  1|
|        39|        1|               68|  1|
|        42|        2|               39|  1|
|        44|        9|               70|  1|
|        47|        8|               55|  1|
|        48|        1|               33|  1|
|        4

In [33]:
spark.sql("select * from\
            (select product_id, seller_id, sum(num_pieces_sold) as pieces_sold,\
                    row_number() over(partition by product_id order by product_id) as row\
                    from df_sales \
                    group by product_id, seller_id \
                    order by product_id)\
            where row = 1").show()

+----------+---------+-----------+---+
|product_id|seller_id|pieces_sold|row|
+----------+---------+-----------+---+
|         0|        0|   19208192|  1|
|         1|        7|         25|  1|
|         2|        1|         28|  1|
|         5|        4|         35|  1|
|         6|        8|         68|  1|
|        12|        8|        100|  1|
|        13|        8|          6|  1|
|        18|        8|         22|  1|
|        19|        7|         51|  1|
|        26|        6|         47|  1|
|        30|        5|         56|  1|
|        34|        2|         42|  1|
|        36|        2|         25|  1|
|        37|        2|         32|  1|
|        39|        1|         68|  1|
|        42|        2|         39|  1|
|        44|        9|         70|  1|
|        47|        8|         55|  1|
|        48|        1|         33|  1|
|        49|        5|         38|  1|
+----------+---------+-----------+---+
only showing top 20 rows



###  ¿Quiénes son los del producto con `product_id = 0`?

In [34]:
temp.where(temp.product_id == 0).show()

+----------+---------+-----------------+
|product_id|seller_id|n_piezas_vendidas|
+----------+---------+-----------------+
|         0|        0|         19208192|
+----------+---------+-----------------+



In [35]:
##
spark.sql("select product_id, seller_id, sum(num_pieces_sold) as pieces_sold\
                from df_sales\
                where product_id = 0\
                group by product_id, seller_id\
                order by product_id").show()

+----------+---------+-----------+
|product_id|seller_id|pieces_sold|
+----------+---------+-----------+
|         0|        0|   19208192|
+----------+---------+-----------+



### 6. Cree una nueva columna llamada "hashed_bill"

In [37]:
df_6 = df_sales.select("order_id","bill_raw_text")


In [38]:
#probando el hash md5 en la columna bill_raw_text
df_6.withColumn("hashed_bill", md5(df_6.bill_raw_text)).show()

+--------+--------------------+--------------------+
|order_id|       bill_raw_text|         hashed_bill|
+--------+--------------------+--------------------+
|       1|pqctnxzofwmtwlpsz...|8bc043c5b5667b430...|
|       2|olapxfcjktuajsuwp...|73d3cb489a6d7f404...|
|       3|ksrwojmeqpfipdaed...|138702f6603eb97e4...|
|       4|gpfifiykcpiwfftvo...|e31df5d21e8278451...|
|       5|qafhecifvdljsflyj...|62f23b475f2a654d8...|
|       6|zxhorgqqououyqnmn...|8953e20d1c635ba25...|
|       7|uypazquoqxxevgqff...|4f8f4cc13ea3527f1...|
|       8|wwchozldqkostuang...|5187c7545e9f82b6b...|
|       9|xglmmsgbiuclmwkpn...|195ed7180971fe395...|
|      10|ejtgoapbwptkhgnhs...|c377e2166b4a5b518...|
|      11|gyzbmrvmgkthjobod...|1e826ede7ec9fa247...|
|      12|gladpurwsbqvesdgz...|25fc1145163435832...|
|      13|szuihdpyqpqaubdwg...|2ea8ab55840b84f8e...|
|      14|ztaiiiepyksrfpgco...|5335a077d42a688d9...|
|      15|dhoezgeyefkmwwbch...|5ad62c9f4c32732b1...|
|      16|shdqilxigwbokuhnf...|b69adf59ef59b6d

In [39]:
#probando el hash sha256 en la columna bill_raw_text
df_6.withColumn("hashed_bill", sha2(df_6.bill_raw_text,256)).show()

+--------+--------------------+--------------------+
|order_id|       bill_raw_text|         hashed_bill|
+--------+--------------------+--------------------+
|       1|pqctnxzofwmtwlpsz...|a01585b2954e28192...|
|       2|olapxfcjktuajsuwp...|c159e77eda8927078...|
|       3|ksrwojmeqpfipdaed...|7d30cb71e384aa843...|
|       4|gpfifiykcpiwfftvo...|acd83bea9abb6a323...|
|       5|qafhecifvdljsflyj...|342de8e8affa5a01e...|
|       6|zxhorgqqououyqnmn...|6d7ac6bf1fa2b43f7...|
|       7|uypazquoqxxevgqff...|f2555e65c65446e71...|
|       8|wwchozldqkostuang...|0f01f25d46a0ad27e...|
|       9|xglmmsgbiuclmwkpn...|ee1b6eec80c3197b9...|
|      10|ejtgoapbwptkhgnhs...|493bf07cd19b52d38...|
|      11|gyzbmrvmgkthjobod...|c6592baef11c6b997...|
|      12|gladpurwsbqvesdgz...|3b67d8525f9ad5da2...|
|      13|szuihdpyqpqaubdwg...|eb00949f0279c0825...|
|      14|ztaiiiepyksrfpgco...|00eac44507ef793f7...|
|      15|dhoezgeyefkmwwbch...|389e5b181fa80691b...|
|      16|shdqilxigwbokuhnf...|3793c9cff9308ce

In [40]:
# creacion de una UDF Function para iterar el md5
def hashmd5(text): 
    return text
    n = text.count("A")
    for i in range(n):
        text = md5(text)
    return text

md5UDF = spark.udf.register("hashmd5", hashmd5)

In [41]:
#Uso de la UDF en el filtro del order_id
df_hash = df_6.withColumn("hashed_bill", when(df_6.order_id % 2 == 0, md5UDF(df_6.bill_raw_text))\
                                      .otherwise(sha2(df_6.bill_raw_text,256)))
df_hash.printSchema()
df_hash.show()
#aqui me genera un error que desconozco

root
 |-- order_id: integer (nullable = true)
 |-- bill_raw_text: string (nullable = true)
 |-- hashed_bill: string (nullable = true)



Py4JJavaError: An error occurred while calling o359.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 100.0 failed 1 times, most recent failure: Lost task 0.0 in stage 100.0 (TID 5291, localhost, executor driver): org.apache.spark.SparkException: 
Error from python worker:
  Traceback (most recent call last):
    File "/usr/lib64/python2.6/runpy.py", line 104, in _run_module_as_main
      loader, code, fname = _get_module_details(mod_name)
    File "/usr/lib64/python2.6/runpy.py", line 79, in _get_module_details
      loader = get_loader(mod_name)
    File "/usr/lib64/python2.6/pkgutil.py", line 456, in get_loader
      return find_loader(fullname)
    File "/usr/lib64/python2.6/pkgutil.py", line 466, in find_loader
      for importer in iter_importers(fullname):
    File "/usr/lib64/python2.6/pkgutil.py", line 422, in iter_importers
      __import__(pkg)
    File "/opt/hadoop/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 51, in <module>
    File "/opt/hadoop/spark/python/lib/pyspark.zip/pyspark/context.py", line 31, in <module>
    File "/opt/hadoop/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 97, in <module>
    File "/opt/hadoop/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 72, in <module>
    File "/opt/hadoop/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 246, in <module>
    File "/opt/hadoop/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 270, in CloudPickler
  NameError: name 'memoryview' is not defined
PYTHONPATH was:
  /opt/hadoop/spark/python/lib/pyspark.zip:/opt/hadoop/spark/python/lib/py4j-0.10.7-src.zip:/opt/hadoop/spark/jars/spark-core_2.11-2.4.8.jar:/opt/hadoop/spark/python/lib/py4j-0.10.7-src.zip:/opt/hadoop/spark/python/::/opt/hadoop/spark/python/lib/py4j-0.10.7-src.zip:/opt/hadoop/spark/python/:/opt/hadoop/spark/python/lib/py4j-0.10.7-src.zip:/opt/hadoop/spark/python/
org.apache.spark.SparkException: No port number in pyspark.daemon's stdout
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:204)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:122)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:95)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:109)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:128)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2088)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2107)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:370)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3388)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3369)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3368)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.GeneratedMethodAccessor99.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: 
Error from python worker:
  Traceback (most recent call last):
    File "/usr/lib64/python2.6/runpy.py", line 104, in _run_module_as_main
      loader, code, fname = _get_module_details(mod_name)
    File "/usr/lib64/python2.6/runpy.py", line 79, in _get_module_details
      loader = get_loader(mod_name)
    File "/usr/lib64/python2.6/pkgutil.py", line 456, in get_loader
      return find_loader(fullname)
    File "/usr/lib64/python2.6/pkgutil.py", line 466, in find_loader
      for importer in iter_importers(fullname):
    File "/usr/lib64/python2.6/pkgutil.py", line 422, in iter_importers
      __import__(pkg)
    File "/opt/hadoop/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 51, in <module>
    File "/opt/hadoop/spark/python/lib/pyspark.zip/pyspark/context.py", line 31, in <module>
    File "/opt/hadoop/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 97, in <module>
    File "/opt/hadoop/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 72, in <module>
    File "/opt/hadoop/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 246, in <module>
    File "/opt/hadoop/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 270, in CloudPickler
  NameError: name 'memoryview' is not defined
PYTHONPATH was:
  /opt/hadoop/spark/python/lib/pyspark.zip:/opt/hadoop/spark/python/lib/py4j-0.10.7-src.zip:/opt/hadoop/spark/jars/spark-core_2.11-2.4.8.jar:/opt/hadoop/spark/python/lib/py4j-0.10.7-src.zip:/opt/hadoop/spark/python/::/opt/hadoop/spark/python/lib/py4j-0.10.7-src.zip:/opt/hadoop/spark/python/:/opt/hadoop/spark/python/lib/py4j-0.10.7-src.zip:/opt/hadoop/spark/python/
org.apache.spark.SparkException: No port number in pyspark.daemon's stdout
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:204)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:122)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:95)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:109)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:128)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [42]:
##Alternativamente probando que funciona el filtro por order_id par e impar y los hash md5 y sha256

dff = df_6.withColumn("hashed_bill", when(df_6.order_id % 2 == 0, md5(df_6.bill_raw_text)) \
                                      .otherwise(sha2(df_6.bill_raw_text,256)))
dff.show()

+--------+--------------------+--------------------+
|order_id|       bill_raw_text|         hashed_bill|
+--------+--------------------+--------------------+
|       1|pqctnxzofwmtwlpsz...|a01585b2954e28192...|
|       2|olapxfcjktuajsuwp...|73d3cb489a6d7f404...|
|       3|ksrwojmeqpfipdaed...|7d30cb71e384aa843...|
|       4|gpfifiykcpiwfftvo...|e31df5d21e8278451...|
|       5|qafhecifvdljsflyj...|342de8e8affa5a01e...|
|       6|zxhorgqqououyqnmn...|8953e20d1c635ba25...|
|       7|uypazquoqxxevgqff...|f2555e65c65446e71...|
|       8|wwchozldqkostuang...|5187c7545e9f82b6b...|
|       9|xglmmsgbiuclmwkpn...|ee1b6eec80c3197b9...|
|      10|ejtgoapbwptkhgnhs...|c377e2166b4a5b518...|
|      11|gyzbmrvmgkthjobod...|c6592baef11c6b997...|
|      12|gladpurwsbqvesdgz...|25fc1145163435832...|
|      13|szuihdpyqpqaubdwg...|eb00949f0279c0825...|
|      14|ztaiiiepyksrfpgco...|5335a077d42a688d9...|
|      15|dhoezgeyefkmwwbch...|389e5b181fa80691b...|
|      16|shdqilxigwbokuhnf...|b69adf59ef59b6d