## Chapter 7 and 8

In [4]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\spark\\spark-3.0.0-preview2-bin-hadoop2.7'

In [5]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [6]:
df = spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load("C:/Users/pilla/Documents/Spark-The-Definitive-Guide-master/Spark-The-Definitive-Guide-master/data/retail-data/by-day/*.csv")\
.coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")

#### Count

In [7]:
df.count()

541909

In [9]:
from pyspark.sql.functions import count
df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



#### Count Distinct

In [14]:
from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



#### Approx_count_distinct

In [15]:
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode",0.05)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3804|
+--------------------------------+



#### First and Last

In [16]:
from pyspark.sql.functions import first, last
df.select(first("StockCode"),last("StockCode")).show()

+-----------------------+----------------------+
|first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
|                  23084|                 22168|
+-----------------------+----------------------+



#### Min and Max

In [17]:
from pyspark.sql.functions import min,max
df.select(min("Quantity"),max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



#### Sum

In [19]:
from pyspark.sql.functions import sum
df.select(sum("Quantity")).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



#### Sum Distinct

sum distinct set of values

In [20]:
from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("Quantity")).show()

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



#### Average

In [25]:
from pyspark.sql.functions import avg, expr

df.select(
count("Quantity").alias("total_transactions"),
sum("Quantity").alias("total_purchases"),
avg("Quantity").alias("avg_purchase"),
expr("mean(Quantity)").alias("mean_purchases"))\
.selectExpr("total_purchases/total_transactions","avg_purchase","mean_purchases").show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|    avg_purchase|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



#### Variance and Standard Deviation 

In [26]:
from pyspark.sql.functions import var_pop,stddev_pop
from pyspark.sql.functions import var_samp,stddev_samp
df.select(var_pop("Quantity"),var_samp("Quantity"),
         stddev_samp("Quantity"),stddev_pop("Quantity")).show()

+-----------------+------------------+---------------------+--------------------+
|var_pop(Quantity)|var_samp(Quantity)|stddev_samp(Quantity)|stddev_pop(Quantity)|
+-----------------+------------------+---------------------+--------------------+
|47559.30364660885| 47559.39140929855|    218.0811578502337|   218.0809566344775|
+-----------------+------------------+---------------------+--------------------+



#### Skewness and Kurtosis

In [27]:
from pyspark.sql.functions import skewness, kurtosis

df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+------------------+------------------+
|skewness(Quantity)|kurtosis(Quantity)|
+------------------+------------------+
|-0.264075576105286|119768.05495534562|
+------------------+------------------+



#### Covariance and Correlation

In [29]:
from pyspark.sql.functions import corr,covar_pop,covar_samp
df.select(corr("InvoiceNo","Quantity"), covar_samp("InVoiceNo","Quantity"), covar_pop("InvoiceNo","Quantity")).show()

+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InVoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085617365E-4|             1052.7280543863135|            1052.7260778702093|
+-------------------------+-------------------------------+------------------------------+



#### Aggregating to Complex Types (Quite Useful)

In [30]:
from pyspark.sql.functions import collect_list, collect_set
df.agg(collect_list("Country"), collect_set("Country")).show()

+---------------------+--------------------+
|collect_list(Country)|collect_set(Country)|
+---------------------+--------------------+
| [United Kingdom, ...|[Portugal, Italy,...|
+---------------------+--------------------+



In [31]:
from pyspark.sql.functions import count
df.groupBy("InvoiceNo").agg(
count("Quantity").alias("quan"),
expr("count(Quantity)")).show()

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   574966|   8|              8|
|   575091|  38|             38|
|   578057|  28|             28|
|   537252|   1|              1|
|   578459|   8|              8|
|  C578132|   1|              1|
|   578292|  72|             72|
|   576112|  20|             20|
|   577022|  38|             38|
|   574592|   8|              8|
|  C576393|   2|              2|
|   577511|  46|             46|
|   577541|  21|             21|
|   580739|   2|              2|
|   580906|   4|              4|
|   573726|   1|              1|
|   575671|  20|             20|
|   570264|   1|              1|
|   570281|   3|              3|
|   569823|  69|             69|
+---------+----+---------------+
only showing top 20 rows



In [32]:
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)")).show()

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   574966|               6.0|   3.640054944640259|
|   575091|11.552631578947368|   5.008925551458656|
|   578057| 4.607142857142857|   8.755974636597271|
|   537252|              31.0|                 0.0|
|   578459|              28.0|                26.0|
|  C578132|              -1.0|                 0.0|
|   578292| 5.902777777777778|   8.759375488618884|
|   576112|              10.9|  7.4959989327640635|
|   577022| 5.131578947368421|   2.903455768848916|
|   574592|              7.25|  4.4651427748729375|
|  C576393|              -3.5|                 2.5|
|   577511|3.1739130434782608|  5.4025128928727195|
|   577541| 9.333333333333334|    9.18245393767158|
|   580739|               2.5|                 0.5|
|   580906|              27.0|  13.076696830622021|
|   573726|             -67.0|                 0.0|
|   575671| 

#### Window Function

In [80]:
from pyspark.sql.functions import col,to_date
dfWithDate = df.withColumn("date",to_date(col("InvoiceDate")))
dfWithDate.createOrReplaceTempView("dfWithDate")

In [81]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
windowSpec = Window\
.partitionBy("CustomerId","date")\
.orderBy(desc("Quantity"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

#### Max purchasing Quantity

In [82]:
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [83]:
from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [84]:
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
.select(
col("CustomerId"),
col("date"),
col("Quantity"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

+----------+----------+--------+------------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+----------+----------+--------+------------+-----------------+-------------------+
|   12395.0|2011-03-23|     100|           1|                1|                100|
|   12395.0|2011-03-23|     100|           1|                1|                100|
|   12395.0|2011-03-23|      48|           3|                2|                100|
|   12395.0|2011-03-23|      20|           4|                3|                100|
|   12395.0|2011-03-23|      12|           5|                4|                100|
|   12395.0|2011-03-23|      12|           5|                4|                100|
|   12395.0|2011-03-23|      10|           7|                5|                100|
|   12395.0|2011-03-23|      10|           7|                5|                100|
|   12395.0|2011-03-23|       6|           9|                6|             

In [85]:
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

#### Grouping Sets

In [86]:
result = spark.sql("""SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY CustomerId,stockCode GROUPING SETS((CustomerId,stockCode))
ORDER BY CustomerId DESC, stockCode DESC""")
result.show()

+----------+---------+-------------+
|CustomerId|stockCode|sum(Quantity)|
+----------+---------+-------------+
|   18287.0|    85173|           48|
|   18287.0|   85040A|           48|
|   18287.0|   85039B|          120|
|   18287.0|   85039A|           96|
|   18287.0|    84920|            4|
|   18287.0|    84584|            6|
|   18287.0|   84507C|            6|
|   18287.0|   72351B|           24|
|   18287.0|   72351A|           24|
|   18287.0|   72349B|           60|
|   18287.0|    47422|           24|
|   18287.0|    47421|           48|
|   18287.0|    35967|           36|
|   18287.0|    23445|           20|
|   18287.0|    23378|           24|
|   18287.0|    23376|           48|
|   18287.0|    23310|           36|
|   18287.0|    23274|           12|
|   18287.0|    23272|           12|
|   18287.0|    23269|           36|
+----------+---------+-------------+
only showing top 20 rows



#### Roll Up

In [87]:
rolledUpDF = dfNoNull.rollup("date","Country").agg(sum("Quantity"))\
.selectExpr("date","Country","`sum(Quantity)` as total_quantity")\
.orderBy("date")
rolledUpDF.show()

+----------+--------------+--------------+
|      date|       Country|total_quantity|
+----------+--------------+--------------+
|      null|          null|       5176450|
|2010-12-01|          null|         26814|
|2010-12-01|     Australia|           107|
|2010-12-01|        France|           449|
|2010-12-01|United Kingdom|         23949|
|2010-12-01|          EIRE|           243|
|2010-12-01|        Norway|          1852|
|2010-12-01|   Netherlands|            97|
|2010-12-01|       Germany|           117|
|2010-12-02|       Germany|           146|
|2010-12-02|          EIRE|             4|
|2010-12-02|          null|         21023|
|2010-12-02|United Kingdom|         20873|
|2010-12-03|        Poland|           140|
|2010-12-03|        France|           239|
|2010-12-03|       Germany|           170|
|2010-12-03|      Portugal|            65|
|2010-12-03|         Spain|           400|
|2010-12-03|   Switzerland|           110|
|2010-12-03|       Belgium|           528|
+----------

#### Cube

In [88]:
dfNoNull.cube("date","Country").agg(sum(col("Quantity")))\
.select("Date","Country","sum(Quantity)").orderBy("date").show()

+----+--------------------+-------------+
|Date|             Country|sum(Quantity)|
+----+--------------------+-------------+
|null|               Japan|        25218|
|null|            Portugal|        16180|
|null|           Australia|        83653|
|null|             Germany|       117448|
|null|                 RSA|          352|
|null|              Cyprus|         6317|
|null|           Hong Kong|         4769|
|null|         Unspecified|         3300|
|null|           Singapore|         5234|
|null|                null|      5176450|
|null|     Channel Islands|         9479|
|null|             Finland|        10666|
|null|             Denmark|         8188|
|null|               Spain|        26824|
|null|             Lebanon|          386|
|null|  European Community|          497|
|null|United Arab Emirates|          982|
|null|              Norway|        19247|
|null|      Czech Republic|          592|
|null|                 USA|         1034|
+----+--------------------+-------

#### Pivot

In [90]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

## Joins

In [95]:
person = spark.createDataFrame([
    (0,"Bill Chambers",0,[100]),
    (1,"Matet Zaharia",1,[500,200,100]),
    (2,"Michael Armbrust",1,[250,100])])\
.toDF("id","name","graduate_program","spark_status")
graduateProgram = spark.createDataFrame([
    (0,"Masters","School of Information","UC Berkely"),
    (2,"Masters","EECS","UC Berkeley"),
    (1,"Phd","EECS","UC Berkeley")])\
.toDF("id","degree","department","school")
sparkStatus = spark.createDataFrame([
    (500,"Vice President"),
    (250,"PMC Member"),
    (100,"Contributor")])\
.toDF("id","status")

#### Inner Join

In [98]:
person.show()

+---+----------------+----------------+---------------+
| id|            name|graduate_program|   spark_status|
+---+----------------+----------------+---------------+
|  0|   Bill Chambers|               0|          [100]|
|  1|   Matet Zaharia|               1|[500, 200, 100]|
|  2|Michael Armbrust|               1|     [250, 100]|
+---+----------------+----------------+---------------+



In [99]:
graduateProgram.show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...| UC Berkely|
|  2|Masters|                EECS|UC Berkeley|
|  1|    Phd|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



In [96]:
joinExpression = person["graduate_program"] == graduateProgram["id"]
person.join(graduateProgram,joinExpression,"inner").show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...| UC Berkely|
|  1|   Matet Zaharia|               1|[500, 200, 100]|  1|    Phd|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|    Phd|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



#### Outer Join

In [97]:
person.join(graduateProgram, joinExpression, "outer").show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...| UC Berkely|
|   1|   Matet Zaharia|               1|[500, 200, 100]|  1|    Phd|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|    Phd|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



#### Left Join

In [108]:
graduateProgram.join(person, joinExpression, "left_outer").show()

+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
| id| degree|          department|     school|  id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
|  0|Masters|School of Informa...| UC Berkely|   0|   Bill Chambers|               0|          [100]|
|  1|    Phd|                EECS|UC Berkeley|   1|   Matet Zaharia|               1|[500, 200, 100]|
|  1|    Phd|                EECS|UC Berkeley|   2|Michael Armbrust|               1|     [250, 100]|
|  2|Masters|                EECS|UC Berkeley|null|            null|            null|           null|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+



#### Right Join

In [107]:
person.join(graduateProgram, joinExpression, "right_outer").show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...| UC Berkely|
|   1|   Matet Zaharia|               1|[500, 200, 100]|  1|    Phd|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|    Phd|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



#### Left Semi Join

In [113]:
gradProgram2 = graduateProgram.union(spark.createDataFrame([
    (0,"Masters","Duplicated Row","Duplicated School")]))
gradProgram2.createOrReplaceTempView("gradProgram2")

In [118]:
exprs = gradProgram2["id"]== person["graduate_program"]
gradProgram2.join(person, exprs, "left_semi").show()

+---+-------+--------------------+-----------------+
| id| degree|          department|           school|
+---+-------+--------------------+-----------------+
|  0|Masters|School of Informa...|       UC Berkely|
|  1|    Phd|                EECS|      UC Berkeley|
|  0|Masters|      Duplicated Row|Duplicated School|
+---+-------+--------------------+-----------------+



#### Left Anti Join

In [119]:
gradProgram2.join(person, exprs, "left_anti").show()

+---+-------+----------+-----------+
| id| degree|department|     school|
+---+-------+----------+-----------+
|  2|Masters|      EECS|UC Berkeley|
+---+-------+----------+-----------+



#### Natural Join

In [122]:
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
spark.sql("""SELECT * FROM graduateProgram NATURAL JOIN person""").show()

+---+-------+--------------------+-----------+----------------+----------------+---------------+
| id| degree|          department|     school|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+----------------+----------------+---------------+
|  0|Masters|School of Informa...| UC Berkely|   Bill Chambers|               0|          [100]|
|  1|    Phd|                EECS|UC Berkeley|   Matet Zaharia|               1|[500, 200, 100]|
|  2|Masters|                EECS|UC Berkeley|Michael Armbrust|               1|     [250, 100]|
+---+-------+--------------------+-----------+----------------+----------------+---------------+



#### Cross Join

In [123]:
graduateProgram.join(person, joinExpression, "cross").show()

+---+-------+--------------------+-----------+---+----------------+----------------+---------------+
| id| degree|          department|     school| id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+
|  0|Masters|School of Informa...| UC Berkely|  0|   Bill Chambers|               0|          [100]|
|  1|    Phd|                EECS|UC Berkeley|  1|   Matet Zaharia|               1|[500, 200, 100]|
|  1|    Phd|                EECS|UC Berkeley|  2|Michael Armbrust|               1|     [250, 100]|
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+



In [124]:
person.crossJoin(graduateProgram).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...| UC Berkely|
|  0|   Bill Chambers|               0|          [100]|  2|Masters|                EECS|UC Berkeley|
|  0|   Bill Chambers|               0|          [100]|  1|    Phd|                EECS|UC Berkeley|
|  1|   Matet Zaharia|               1|[500, 200, 100]|  0|Masters|School of Informa...| UC Berkely|
|  1|   Matet Zaharia|               1|[500, 200, 100]|  2|Masters|                EECS|UC Berkeley|
|  1|   Matet Zaharia|               1|[500, 200, 100]|  1|    Phd|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  0|Masters|School of Informa...| UC

#### Enable for cross join "spark.sql.crossJoin.enable"

#### Join on Complex Types

In [129]:
from pyspark.sql.functions import expr
person.withColumnRenamed("id","personId")\
.join(sparkStatus, expr("array_contains(spark_status,id)")).show()

+--------+----------------+----------------+---------------+---+--------------+
|personId|            name|graduate_program|   spark_status| id|        status|
+--------+----------------+----------------+---------------+---+--------------+
|       0|   Bill Chambers|               0|          [100]|100|   Contributor|
|       1|   Matet Zaharia|               1|[500, 200, 100]|500|Vice President|
|       1|   Matet Zaharia|               1|[500, 200, 100]|100|   Contributor|
|       2|Michael Armbrust|               1|     [250, 100]|250|    PMC Member|
|       2|Michael Armbrust|               1|     [250, 100]|100|   Contributor|
+--------+----------------+----------------+---------------+---+--------------+



#### Handling Duplicate Columns

In [130]:
person.join(graduateProgram, joinExpression).select("graduate_program").show()

+----------------+
|graduate_program|
+----------------+
|               0|
|               1|
|               1|
+----------------+

