# Init SparkContext

In [1]:
from datetime import datetime
from pyspark import SparkContext, HiveContext
from pyspark.sql import SparkSession, SQLContext

In [2]:
spark = (SparkSession.builder.appName("pyspark-dataframe-demo-{}".format(datetime.today()))
        .master("spark://spark-master:7077")      
        .getOrCreate())

sqlContext = SQLContext(spark)
spark.sparkContext.getConf().getAll()




[('spark.driver.host', 'f115dd85bf2e'),
 ('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false'),
 ('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.repl.local.jars',
  'file:///usr/local/spark-3.3.2-bin-hadoop3/jars/delta-core_2.12-2.2.0.jar,file:///usr/local/spark-3.3.2-bi

# Create DataFrame

## By loading dataset

In [4]:
df_orders = spark.read.format("csv").option("delimiter", ";").load("s3a://warehouse/conso-elec-gaz-annuelle-par-naf-agregee-region.csv", header=True)
df_orders.limit(20).toPandas()

Unnamed: 0,operateur,annee,filiere,code_categorie_consommation,libelle_categorie_consommation,code_grand_secteur,libelle_grand_secteur,code_naf,libelle_secteur_naf2,conso,pdl,indqual,nombre_mailles_secretisees,code_region,libelle_region
0,Régie intercommunale d'électricité de Niederbr...,2021,Electricité,PRO,Petits professionels,A,Agriculture,,0,115.362,9,0.491,0,44,Grand Est
1,Régie Municipale Bazas-Energies,2021,Electricité,RES,Résidentiel,R,Résidentiel,,0,12471.471,2634,0.00721,0,75,Nouvelle-Aquitaine
2,Régie municipale d'électricité d'Arignac,2021,Electricité,ENT,Entreprises,T,Tertiaire,94.0,Activités des organisations associatives,493.0,1,1.0,0,76,Occitanie
3,Régie municipale d'électricité d'Arignac,2021,Electricité,PRO,Petits professionels,I,Industrie,,0,0.0,0,0.0,1,76,Occitanie
4,Régie municipale d'électricité d'Arignac,2021,Electricité,PRO,Petits professionels,T,Tertiaire,,0,385.0,52,1.0,0,76,Occitanie
5,Régie municipale d'électricité de l'Hospitalet...,2021,Electricité,ENT,Entreprises,T,Tertiaire,47.0,"Commerce de détail, à l'exception des automobi...",123.0,1,1.0,0,76,Occitanie
6,Régie municipale d'électricité de l'Hospitalet...,2021,Electricité,RES,Résidentiel,R,Résidentiel,,0,256.0,83,1.0,0,76,Occitanie
7,Régie municipale d'électricité de La Bresse,2021,Electricité,ENT,Entreprises,T,Tertiaire,85.0,Enseignement,80.673,1,1.0,0,44,Grand Est
8,Régie municipale d'électricité de La Bresse,2021,Electricité,ENT,Entreprises,T,Tertiaire,93.0,"Activités sportives, récréatives et de loisirs",72.914,1,1.0,0,44,Grand Est
9,Régie municipale d'électricité de La Bresse,2021,Electricité,ENT,Entreprises,T,Tertiaire,96.0,Autres services personnels,613.007,1,0.867,0,44,Grand Est


In [4]:
# version 1: full records
(
df_orders.write.mode("overwrite")
    .option("compression", "snappy")
    .option("path", "s3a://warehouse/olist_orders_dataset.delta")
    .format("delta")
    .saveAsTable("olist_orders_dataset")
)

In [5]:
# version 2: limit 10 records
(
df_orders.limit(10)
    .write.mode("overwrite")
    .option("compression", "snappy")
    .option("path", "s3a://warehouse/olist_orders_dataset.delta")
    .format("delta")
    .saveAsTable("olist_orders_dataset")
)

In [6]:
# select version 1
(
    spark.read.option("versionAsOf", 0)
    .format("delta").load("s3a://warehouse/olist_orders_dataset.delta")
    .count()
)

99441

In [8]:
# select version 2
(
    spark.read.option("versionAsOf", 1)
    .format("delta").load("s3a://warehouse/olist_orders_dataset.delta")
    .count()
)

10

# Spark SQL

In [9]:
spark.sql("show databases").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [10]:
spark.sql("show tables").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|  default|olist_orders_dataset|      false|
+---------+--------------------+-----------+



In [11]:
spark.sql("SELECT * FROM olist_orders_dataset").show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [13]:
# select version 1
spark.sql("SELECT COUNT(*) AS num_records FROM olist_orders_dataset VERSION AS OF 0").show()

+-----------+
|num_records|
+-----------+
|      99441|
+-----------+



In [14]:
# select version 2
spark.sql("SELECT COUNT(*) AS num_records FROM olist_orders_dataset VERSION AS OF 1").show()

+-----------+
|num_records|
+-----------+
|         10|
+-----------+

