## Scratchpad - Notebook for developing PySpark application ETL code of Retail Customer Project

this notebook can be used to interactively try different snippets of code, from PySpark, SparkSQL and PyDeequ (DQ for spark) 

In [1]:
# Imports & variable settings 
import os 
# set environment variable SPARK_VERSION (needed for PyDeequ)

os.environ["SPARK_VERSION"]="3.3.4"
from pyspark.sql import SparkSession, Row 
import pydeequ 
from pydeequ.analyzers import * 

Now, create spark session with Deequ JAR files included within configs

In [2]:
# create spark session 
try:
    spark = (SparkSession.builder
                .appName("Retail DE ETL app")
                .enableHiveSupport()
                .master("local[*]")
                .config("spark.sql.adaptive.enabled", "true")
                .config("spark.dynamicAllocation.enabled", "true")
                .config("spark.sql.caseSensitive", "false")
                .config("spark.sql.parquet.writeLegacyFormat", "true")
                .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
                .config("hive.exec.dynamic.partition.mode", "nonstrict")
                .config("spark.shuffle.service.enabled", "true")
                .config("spark.dynamicAllocation.InitialExecutors", "0")
                .config("spark.jars.packages", pydeequ.deequ_maven_coord)
                .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
                .getOrCreate() 
    )
    # set logging level using spark context 
    sc = spark.sparkContext # accesses spark context 
    sc.setLogLevel("ERROR") # set's logging level to INFO (use WARN, INFO, DEBUG, ERROR etc.) 
    print("=" * 150)
    print("PySpark session available through `spark` object") 
except Exception as e:
    print(e)

/opt/spark-3.3.4-bin-hadoop3/bin/load-spark-env.sh: line 68: ps: command not found


:: loading settings :: url = jar:file:/opt/spark-3.3.4-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.amazon.deequ#deequ added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-271b26c5-d4a5-4fc4-a336-71103ff85613;1.0
	confs: [default]
	found com.amazon.deequ#deequ;2.0.3-spark-3.3 in central
	found org.scala-lang#scala-reflect;2.12.10 in central
	found org.scalanlp#breeze_2.12;0.13.2 in central
	found org.scalanlp#breeze-macros_2.12;0.13.2 in central
	found com.github.fommil.netlib#core;1.1.2 in central
	found net.sf.opencsv#opencsv;2.3 in central
	found com.github.rwl#jtransforms;2.4.0 in central
	found junit#junit;4.8.2 in central
	found org.apache.commons#commons-math3;3.2 in central
	found org.spire-math#spire_2.12;0.13.0 in central
	found org.spire-math#spire-macros_2.12;0.13.0 in central
	found org.typelevel#machinist_2.12;0.6.1 in central
	found com.chuusai#shapeless_2.12;2.3.2 in central
	found org.typelevel#macro-compat_2.12;1.1.1 in central
	fo

23/12/20 10:45:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


PySpark session available through `spark` object


-----------

#### Basic Tests

create a simple dataframe, run some quick pydeequ analyzers on it just to make sure everything works ...

In [4]:
spark.sql("SHOW DATABASES").show() # should simply return `default`

+---------+
|namespace|
+---------+
|  default|
+---------+



In [5]:
df = sc.parallelize([
        Row(a="foo", b=1, c=5),
        Row(a="bar", b=2, c=6),
        Row(a="baz", b=3, c=None)
        ]).toDF() 

df.toPandas() # prints to notebook in nice pandas DF display 

                                                                                

Unnamed: 0,a,b,c
0,foo,1,5.0
1,bar,2,6.0
2,baz,3,


In [6]:
# pydeequ test 
analysisResult = (AnalysisRunner(spark)
        .onData(df)
        .addAnalyzer(Size())
        .addAnalyzer(Completeness("b"))
        .run()
    )

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)

analysisResult_df.toPandas() 



Unnamed: 0,entity,instance,name,value
0,Dataset,*,Size,3.0
1,Column,b,Completeness,1.0


End of tests, everything working as expected!

--------------

### ETL development

- start by reading in some datasets to PySpark dataframes

### End of scratchpad - end spark session

In [7]:
# end of notebook, stop spark session 
spark.stop() 