In [0]:
import warnings
warnings.filterwarnings("ignore")

In [0]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

In [0]:
dbutils.library.restartPython()

In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

#####Read transaction files

In [0]:
transactions_file = "dbfs:/FileStore/transaction"
df_transactions = spark.read.parquet(transactions_file)
df_transactions.display()

cust_id,start_date,end_date,txn_id,date,year,month,day,expense_type,amt,city
C0YDPQWPBJ,2010-07-01,2018-12-01,TZ5SMKZY9S03OQJ,2018-10-07,2018,10,7,Entertainment,10.42,boston
C0YDPQWPBJ,2010-07-01,2018-12-01,TYIAPPNU066CJ5R,2016-03-27,2016,3,27,Motor/Travel,44.34,portland
C0YDPQWPBJ,2010-07-01,2018-12-01,TETSXIK4BLXHJ6W,2011-04-11,2011,4,11,Entertainment,3.18,chicago
C0YDPQWPBJ,2010-07-01,2018-12-01,TQKL1QFJY3EM8LO,2018-02-22,2018,2,22,Groceries,268.97,los_angeles
C0YDPQWPBJ,2010-07-01,2018-12-01,TYL6DFP09PPXMVB,2010-10-16,2010,10,16,Entertainment,2.66,chicago
C0YDPQWPBJ,2010-07-01,2018-12-01,T1SMX9EUG21BBSE,2015-02-11,2015,2,11,Education,54.14,portland
C0YDPQWPBJ,2010-07-01,2018-12-01,T449R5YAV3BMX7O,2012-11-14,2012,11,14,Gambling,88.34,seattle
C0YDPQWPBJ,2010-07-01,2018-12-01,TZHEHBKEXF1TPS2,2016-11-19,2016,11,19,Groceries,95.69,philadelphia
C0YDPQWPBJ,2010-07-01,2018-12-01,TVYNJ1ZYAZI6O6J,2013-05-02,2013,5,2,Gambling,50.53,portland
C0YDPQWPBJ,2010-07-01,2018-12-01,TMRZVDMBXYVOYSH,2011-09-03,2011,9,3,Tax,228.39,los_angeles


In [0]:
df_transactions.count()

Out[3]: 39790092

In [0]:
df_transactions.rdd.getNumPartitions()

Out[4]: 12

######Get number of records per partition

In [0]:
display(
    df_transactions.withColumn('partition_id',F.spark_partition_id())
    .groupBy('partition_id').count().alias('no_recs')
)

partition_id,count
0,3592625
1,3663839
2,3454630
3,3657055
4,3494213
5,3408595
6,3253507
7,3357514
8,3129149
9,3130443


#####Read Customer files

In [0]:
customers_file = "dbfs:/FileStore/customer"
df_customers = spark.read.parquet(customers_file)


In [0]:
df_customers.show(5)

+----------+-------------+---+------+----------+-----+-----------+
|   cust_id|         name|age|gender|  birthday|  zip|       city|
+----------+-------------+---+------+----------+-----+-----------+
|C007YEYTX9| Aaron Abbott| 34|Female| 7/13/1991|97823|     boston|
|C00B971T1J| Aaron Austin| 37|Female|12/16/2004|30332|    chicago|
|C00WRSJF1Q| Aaron Barnes| 29|Female| 3/11/1977|23451|     denver|
|C01AZWQMF3|Aaron Barrett| 31|  Male|  7/9/1998|46613|los_angeles|
|C01BKUFRHA| Aaron Becker| 54|  Male|11/24/1979|40284|  san_diego|
+----------+-------------+---+------+----------+-----+-----------+
only showing top 5 rows



## **Narrow-Transformation**


In [0]:
#Narrow transformation
df_narrow_transform = (
    df_customers
    .filter(F.col("city") == "boston")
    .withColumn("first_name", F.split("name", " ").getItem(0))
    .withColumn("last_name", F.split("name", " ").getItem(1))
    .withColumn("age", F.col("age") + F.lit(5))
    .select("cust_id", "first_name", "last_name", "age", "gender", "birthday")
)


In [0]:

df_narrow_transform.show(5, False)
df_narrow_transform.explain(True)

+----------+----------+---------+----+------+---------+
|cust_id   |first_name|last_name|age |gender|birthday |
+----------+----------+---------+----+------+---------+
|C007YEYTX9|Aaron     |Abbott   |39.0|Female|7/13/1991|
|C08XAQUY73|Aaron     |Lambert  |59.0|Female|11/5/1966|
|C094P1VXF9|Aaron     |Lindsey  |29.0|Male  |9/21/1990|
|C097SHE1EF|Aaron     |Lopez    |27.0|Female|4/18/2001|
|C0DTC6436T|Aaron     |Schwartz |57.0|Female|7/9/1962 |
+----------+----------+---------+----+------+---------+
only showing top 5 rows

== Parsed Logical Plan ==
'Project ['cust_id, 'first_name, 'last_name, 'age, 'gender, 'birthday]
+- Project [cust_id#226, name#227, (cast(age#228 as double) + cast(5 as double)) AS age#303, gender#229, birthday#230, zip#231, city#232, first_name#284, last_name#293]
   +- Project [cust_id#226, name#227, age#228, gender#229, birthday#230, zip#231, city#232, first_name#284, split(name#227,  , -1)[1] AS last_name#293]
      +- Project [cust_id#226, name#227, age#228, gen

In [0]:
df_narrow_transform.write.format('noop').mode('overwrite').save('dbfs:/FileStore/dummy')

## **Wide-Transformation**

### Repartition

In [0]:
df_transactions.rdd.getNumPartitions()

Out[14]: 12

In [0]:
df_transactions.show(4)

In [0]:
df_customers.rdd.getNumPartitions()

Out[15]: 1

In [0]:
#display(df_transactions.withColumn('partitionId',F.spark_partition_id()).groupBy('partitionId').count())

In [0]:
df_transactions.write.format('noop').mode('overwrite').save('dbfs:/FileStore/dummy')

In [0]:
df_transactions.repartition(8).explain(True)

### Coalesce

In [0]:
df_transactions.coalesce(2).explain(True)

### Joins

In [0]:
df_transactions.cache()
df_customers.cache()

Out[20]: DataFrame[cust_id: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string]

#####Disbale adaptive query and auto broadcast

In [0]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)
#spark.conf.set("spark.sql.adaptive.enabled", "false")

In [0]:
spark.conf.set("spark.sql.adaptive.enabled", "true")

In [0]:
df_joined = (
    df_transactions.join(
        df_customers,
        how="inner",
        on="cust_id"
    )
)

df_joined.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner,Buffer(cust_id))
:- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet
+- Relation [cust_id#226,name#227,age#228,gender#229,birthday#230,zip#231,city#232] parquet

== Analyzed Logical Plan ==
cust_id: string, start_date: string, end_date: string, txn_id: string, date: string, year: string, month: string, day: string, expense_type: string, amt: string, city: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string
Project [cust_id#2, start_date#3, end_date#4, txn_id#5, date#6, year#7, month#8, day#9, expense_type#10, amt#11, city#12, name#227, age#228, gender#229, birthday#230, zip#231, city#232]
+- Join Inner, (cust_id#2 = cust_id#226)
   :- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet
   +- Relation [cust_id#226,name#227,age#228,gender#229,birthday#230,zip#

In [0]:
df_joined.write.format('noop').mode('overwrite').save('dbfs:/FileStore')

In [0]:
df_joined.rdd.getNumPartitions()

Out[30]: 24

### Group by

In [0]:

    df_city_counts = (
        df_transactions
        .groupBy("city")
        .count()
    )


df_city_counts.explain(True)

== Parsed Logical Plan ==
'Aggregate ['city], ['city, count(1) AS count#7188L]
+- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Analyzed Logical Plan ==
city: string, count: bigint
Aggregate [city#12], [city#12, count(1) AS count#7188L]
+- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Optimized Logical Plan ==
Aggregate [city#12], [city#12, count(1) AS count#7188L]
+- Project [city#12]
   +- InMemoryRelation [cust_id#2, start_date#3, end_date#4, txn_id#5, date#6, year#7, month#8, day#9, expense_type#10, amt#11, city#12], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(1) ColumnarToRow
            +- FileScan parquet [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/Fil

In [0]:
df_city_counts.show(30)

+-------------+-------+
|         city|  count|
+-------------+-------+
|    san_diego|3977780|
|      chicago|3979023|
|       denver|3980274|
|       boston|3978268|
|      seattle|3980022|
|  los_angeles|3982028|
|     new_york|3977480|
|san_francisco|3977094|
| philadelphia|3978193|
|     portland|3979930|
+-------------+-------+



In [0]:
df_city_counts.write.format('noop').mode('overwrite').save('dbfs:/FileStore')

In [0]:
df_city_counts.rdd.getNumPartitions()

Out[34]: 1