In [116]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
from pyspark.sql.functions import col, count, desc, count_distinct, broadcast, concat, lit, upper

In [2]:
# SparkSession Definition
spark = SparkSession.builder.appName("Pyspark Overview").master("local[*]").getOrCreate()

23/06/15 04:15:44 WARN Utils: Your hostname, MacBook-Air-de-Gabriel.local resolves to a loopback address: 127.0.0.1; using 192.168.1.9 instead (on interface en0)
23/06/15 04:15:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/15 04:15:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Spark Configuration

In [13]:
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")

'10485760b'

In [16]:
#Set Threshold limit of size in bytes of a DataFrame to broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760)

#Disable broadcast Join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)

### Files Manipulating

In [None]:
spark.read.csv("../dataset/csv_data/olist_customers_dataset.csv", inferSchema=True, header=True).show()

In [33]:
spark.read \
    .format("csv") \
    .options(header=True) \
    .options(inferSchema=True) \
    .csv("../dataset/csv_data/olist_customers_dataset.csv").show()

                                                                                

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

In [144]:
df_parquet = spark.read.parquet("../dataset/parquet_data/olist_customers_dataset.parquet")
df_parquet.show()
df_parquet.printSchema()

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

In [138]:
parquet_squema = StructType([
    StructField('customer_id', StringType(), False),
    StructField('customer_unique_id', StringType(), False),
    StructField('customer_zip_code_prefix', LongType(), False),
    StructField('customer_city', StringType(), True),
    StructField('customer_state', StringType(), True)
])

df_parquet_squema = spark.read.schema(parquet_squema).parquet("../dataset/parquet_data/olist_customers_dataset.parquet")
df_parquet_squema.show()
df_parquet_squema.printSchema()

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

#### Dataframe: Criação

In [34]:
columns = ["language","users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
data_dict = [{'name': 'Alice', 'age': 1}, {'name': 'Gabriel', 'age': 34}]

# Criar a partir de lista
df = spark.createDataFrame(data, columns)
df.show()
# Criar a partir de RDD
rdd = spark.sparkContext.parallelize(data)
rdd.toDF(columns).show()
# Criar a partir de RDD com createDataFrame
spark.createDataFrame(rdd).show()
# Criar a partir de RDD com createDataFrame
spark.createDataFrame(rdd).toDF(*columns).show()
# Criar a partir de RDD com createDataFrame
spark.createDataFrame(rdd, columns).show()
# Criar a partir de dicionário
spark.createDataFrame(data_dict).show()

                                                                                

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+

+------+------+
|    _1|    _2|
+------+------+
|  Java| 20000|
|Python|100000|
| Scala|  3000|
+------+------+

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+

+---+-------+
|age|   name|
+---+-------+
|  1|  Alice|
| 34|Gabriel|
+---+-------+



#### Dataframe: Manipulação

In [21]:
df = spark.read.parquet("../dataset/parquet_data/olist_customers_dataset.parquet")

In [168]:
# Selecionar Colunas
df.select('customer_id', 'customer_unique_id').show()

+--------------------+--------------------+
|         customer_id|  customer_unique_id|
+--------------------+--------------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|
|18955e83d337fd6b2...|290c77bc529b7ac93...|
|4e7b3e00288586ebd...|060e732b5b29e8181...|
|b2b6027bc5c5109e5...|259dac757896d24d7...|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|
|879864dab9bc30475...|4c93744516667ad3b...|
|fd826e7cf63160e53...|addec96d2e059c80c...|
|5e274e7a0c3809e14...|57b2a98a409812fe9...|
|5adf08e34b2e99398...|1175e95fb47ddff9d...|
|4b7139f34592b3a31...|9afe194fb833f79e3...|
|9fb35e4ed6f0a14a4...|2a7745e1ed516b289...|
|5aa9e4fdd4dfd2095...|2a46fb94aef5cbeeb...|
|b2d1536598b73a9ab...|918dc87cd72cd9f6e...|
|eabebad39a88bb6f5...|295c05e81917928d7...|
|1f1c7bf1c9b041b29...|3151a81801c838636...|
|206f3129c0e4d7d0b...|21f748a16f4e1688a...|
|a7c125a0a07b75146...|5c2991dbd08bbf3cf...|
|c5c61596a3b6bd0ce...|b6e99561fe6f34a55...|
|9b8ce803689b3562d...|7f3a72e8f988c6e73...|
|49d0ea0986edde72d...|3e6fd6b2f0

In [175]:
# Contar os registros de uma coluna
df.select(count('customer_id').alias("ct")).show()

+-----+
|   ct|
+-----+
|99441|
+-----+



In [197]:
# Contar os registros por groupBy
df.groupBy('customer_state', 'customer_city') \
    .agg(count("*").alias("ct")) \
    .sort(desc("ct")) \
    .where(col("ct") >= 1000) \
    .show()

+--------------+--------------+-----+
|customer_state| customer_city|   ct|
+--------------+--------------+-----+
|            SP|     sao paulo|15540|
|            RJ|rio de janeiro| 6882|
|            MG|belo horizonte| 2773|
|            DF|      brasilia| 2131|
|            PR|      curitiba| 1521|
|            SP|      campinas| 1444|
|            RS|  porto alegre| 1379|
|            BA|      salvador| 1245|
|            SP|     guarulhos| 1189|
+--------------+--------------+-----+



In [24]:
# Filtrar os resgistros antes de Agregar
df.select('customer_state', 'customer_city') \
    .where(col('customer_state') != "SP") \
    .groupBy('customer_state', 'customer_city') \
    .agg(count("*").alias("ct")) \
    .sort(desc("ct")) \
    .where(col("ct") >= 1000) \
    .show()

+--------------+--------------+----+
|customer_state| customer_city|  ct|
+--------------+--------------+----+
|            RJ|rio de janeiro|6882|
|            MG|belo horizonte|2773|
|            DF|      brasilia|2131|
|            PR|      curitiba|1521|
|            RS|  porto alegre|1379|
|            BA|      salvador|1245|
+--------------+--------------+----+



In [29]:
# Join múltiplas tabelas com broadcast e método de agregação
dim_customer = spark.read.parquet("../dataset/parquet_data/olist_customers_dataset.parquet")
fact_order = spark.read.parquet("../dataset/parquet_data/olist_orders_dataset.parquet")

fact_order.join(dim_customer, fact_order.customer_id == dim_customer.customer_id, 'inner') \
    .select( \
        fact_order.order_id.alias("cupom_id"), \
        dim_customer.customer_state \
    ) \
    .groupBy("customer_state").agg(count("*").alias("ct")) \
    .sort(desc("ct")) \
    .show()

[Stage 53:>                                                         (0 + 6) / 6]

+--------------+-----+
|customer_state|   ct|
+--------------+-----+
|            SP|41746|
|            RJ|12852|
|            MG|11635|
|            RS| 5466|
|            PR| 5045|
|            SC| 3637|
|            BA| 3380|
|            DF| 2140|
|            ES| 2033|
|            GO| 2020|
|            PE| 1652|
|            CE| 1336|
|            PA|  975|
|            MT|  907|
|            MA|  747|
|            MS|  715|
|            PB|  536|
|            PI|  495|
|            RN|  485|
|            AL|  413|
+--------------+-----+
only showing top 20 rows



                                                                                

In [28]:
# Com broadcast
fact_order.join(broadcast(dim_customer), fact_order.customer_id == dim_customer.customer_id, 'inner') \
    .select( \
        fact_order.order_id.alias("cupom_id"), \
        dim_customer.customer_state \
    ) \
    .groupBy("customer_state").agg(count("*").alias("ct")) \
    .sort(desc("ct")) \
    .explain()

# Sem broadcast
fact_order.join(dim_customer, fact_order.customer_id == dim_customer.customer_id, 'inner') \
    .select( \
        fact_order.order_id.alias("cupom_id"), \
        dim_customer.customer_state \
    ) \
    .groupBy("customer_state").agg(count("*").alias("ct")) \
    .sort(desc("ct")) \
    .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [ct#856L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(ct#856L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=1230]
      +- HashAggregate(keys=[customer_state#545], functions=[count(1)])
         +- Exchange hashpartitioning(customer_state#545, 200), ENSURE_REQUIREMENTS, [plan_id=1227]
            +- HashAggregate(keys=[customer_state#545], functions=[partial_count(1)])
               +- Project [customer_state#545]
                  +- BroadcastHashJoin [customer_id#552], [customer_id#541], Inner, BuildRight, false
                     :- Filter isnotnull(customer_id#552)
                     :  +- FileScan parquet [customer_id#552] Batched: true, DataFilters: [isnotnull(customer_id#552)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/gabrielbossardi/Documents/Projects/pyspark-overview/datase..., PartitionFilters: [], PushedFilters: [IsNotNull(customer_id)], ReadSchema: struct<custo

In [39]:
# union
dim_customer.select(count("*").alias("ct")).show()
dim_customer.union(dim_customer).agg(count("*").alias("ct")).show()
dim_customer.union(dim_customer).distinct().agg(count("*").alias("ct")).show()

+-----+
|   ct|
+-----+
|99441|
+-----+

+------+
|    ct|
+------+
|198882|
+------+

+-----+
|   ct|
+-----+
|99441|
+-----+



In [91]:
df_states = dim_customer.select("customer_state", "customer_city").distinct()
df_states.show()

dim_city = df_states.rdd.map(lambda x: (x[0], f'Cidade: {x[1]}, do Estado de: {x[0]}'))
dim_city.toDF(["customer_state", "customer_city"]).show(truncate=False)

+--------------+--------------------+
|customer_state|       customer_city|
+--------------+--------------------+
|            SP|         carapicuiba|
|            PR|            marialva|
|            SC|            saudades|
|            SP|            guaicara|
|            SP|             ubatuba|
|            RO|sao francisco do ...|
|            PR|           pranchita|
|            BA|             sapeacu|
|            PE|               xexeu|
|            MG|   santana da vargem|
|            SP|  santa rita d'oeste|
|            MA|              grajau|
|            MS|          deodapolis|
|            MG|         montalvania|
|            MG|  ponto do marambaia|
|            PI|     nova santa rita|
|            SP|            boraceia|
|            MS|              tacuru|
|            MG|           ituiutaba|
|            GO|               jatai|
+--------------+--------------------+
only showing top 20 rows

+--------------+----------------------------------------------

In [102]:
df_states.withColumn("customer_city", concat(lit('Cidade: '), col("customer_city"))).show()

+--------------+--------------------+
|customer_state|       customer_city|
+--------------+--------------------+
|            SP| Cidade: carapicuiba|
|            PR|    Cidade: marialva|
|            SC|    Cidade: saudades|
|            SP|    Cidade: guaicara|
|            SP|     Cidade: ubatuba|
|            RO|Cidade: sao franc...|
|            PR|   Cidade: pranchita|
|            BA|     Cidade: sapeacu|
|            PE|       Cidade: xexeu|
|            MG|Cidade: santana d...|
|            SP|Cidade: santa rit...|
|            MA|      Cidade: grajau|
|            MS|  Cidade: deodapolis|
|            MG| Cidade: montalvania|
|            MG|Cidade: ponto do ...|
|            PI|Cidade: nova sant...|
|            SP|    Cidade: boraceia|
|            MS|      Cidade: tacuru|
|            MG|   Cidade: ituiutaba|
|            GO|       Cidade: jatai|
+--------------+--------------------+
only showing top 20 rows



In [119]:
df_states.where(col("customer_state") == "SP").foreach(lambda x: print(f'Cidade: {x[1]}'))

Cidade: carapicuiba
Cidade: guaicara
Cidade: ubatuba
Cidade: santa rita d'oeste
Cidade: boraceia
Cidade: aracariguama
Cidade: borborema
Cidade: campo limpo paulista
Cidade: sao caetano do sul
Cidade: taubate
Cidade: dracena
Cidade: juquia
Cidade: urania
Cidade: elias fausto
Cidade: sarapui
Cidade: dobrada
Cidade: santa barbara d oeste
Cidade: porangaba
Cidade: guariroba
Cidade: guariba
Cidade: itatiba
Cidade: nova odessa
Cidade: pirapozinho
Cidade: piquerobi
Cidade: campinas
Cidade: porto ferreira
Cidade: adolfo
Cidade: tres fronteiras
Cidade: santa cruz das palmeiras
Cidade: bandeirantes d'oeste
Cidade: ferraz de vasconcelos
Cidade: aguai
Cidade: cipo-guacu
Cidade: matao
Cidade: palestina
Cidade: paranapanema
Cidade: capela do alto
Cidade: coroados
Cidade: sao paulo
Cidade: birigui
Cidade: urupes
Cidade: rio das pedras
Cidade: rubineia
Cidade: olimpia
Cidade: sao miguel arcanjo
Cidade: torrinha
Cidade: promissao
Cidade: emilianopolis
Cidade: aramina
Cidade: jeriquara
Cidade: pedreira


#### SQL

In [225]:
df.createOrReplaceTempView("TEMP_DF")

query = """
SELECT
    customer_state,
    customer_city,
    COUNT(*) AS ct
FROM TEMP_DF
GROUP BY 1, 2
HAVING ct >= 1000
ORDER BY ct DESC
"""

spark.sql(query).show()



+--------------+--------------+-----+
|customer_state| customer_city|   ct|
+--------------+--------------+-----+
|            SP|     sao paulo|15540|
|            RJ|rio de janeiro| 6882|
|            MG|belo horizonte| 2773|
|            DF|      brasilia| 2131|
|            PR|      curitiba| 1521|
|            SP|      campinas| 1444|
|            RS|  porto alegre| 1379|
|            BA|      salvador| 1245|
|            SP|     guarulhos| 1189|
+--------------+--------------+-----+



                                                                                

#### RDD: Criação

In [None]:
# Criar a partir de RDD
rdd = spark.sparkContext.parallelize(data)
rdd.toDF(columns).show()

In [99]:
# Criar RDD a partir de Row
Person = Row('code', 'count')
rdd2 = rdd.map(lambda r: Person(*r))
rdd2.collect()

[Row(code='Java', count='20000'),
 Row(code='Python', count='100000'),
 Row(code='Scala', count='3000')]

#### RDD: Transformação

In [108]:
rdd2.map(lambda r: 'Linguagem: ' + r['code']).collect()

['Linguagem: Java', 'Linguagem: Python', 'Linguagem: Scala']

In [None]:
print(fact_order.rdd.getNumPartitions())

fact_order.rdd.mapPartitions(lambda it: [sum(len(row) for row in it)]).collect()

#### repartition() and Coalesce()

In [None]:
df.coalesce(<number>)
df.repartition(<number>)
df.coalesce(<number>)
df.repartition(<number>)

#### Python 

In [121]:
import pandas as pd

ModuleNotFoundError: No module named 'pandas'