In [1]:
 from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port','0'). \
config("spark.sql.warehouse.dir", "/user/itv014119/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
spark

### Creating a spark session and entering sample data n schema for that

In [6]:
sample = [
    (1,'Arun',32,30),
    (2,'John',40,45),
    (3,None,55,35),
    (4,'David',60,29),
    (5,'Beckam',70,80),
    (None,'Raju',39,69),
    (8,'Senpai',None,99),
    (9,'Nanami',20,None),
]

In [7]:
s_schema = "rollno int,name string,phy int,mat int"

### Creating a DataFrame

In [8]:
sample_df = spark.createDataFrame(data=sample,schema=s_schema)

In [9]:
sample_df.show()

+------+------+----+----+
|rollno|  name| phy| mat|
+------+------+----+----+
|     1|  Arun|  32|  30|
|     2|  John|  40|  45|
|     3|  null|  55|  35|
|     4| David|  60|  29|
|     5|Beckam|  70|  80|
|  null|  Raju|  39|  69|
|     8|Senpai|null|  99|
|     9|Nanami|  20|null|
+------+------+----+----+



### Dealing with Null

In [10]:
mean_phy = sample_df.selectExpr("mean(phy)").collect()[0][0]
mean_mat = sample_df.selectExpr("mean(mat)").collect()[0][0]

Updated_df = sample_df.fillna({'rollno':0,'name':'NA','phy':mean_phy,'mat':mean_mat})

In [11]:
Updated_df

rollno,name,phy,mat
1,Arun,32,30
2,John,40,45
3,,55,35
4,David,60,29
5,Beckam,70,80
0,Raju,39,69
8,Senpai,45,99
9,Nanami,20,55


### Renaming and creatin a new column "total" as total of two subjects using withColumn and expr

In [12]:
Renamed_df = Updated_df.withColumnRenamed("mat","chem")

In [13]:
Renamed_df.show()

+------+------+---+----+
|rollno|  name|phy|chem|
+------+------+---+----+
|     1|  Arun| 32|  30|
|     2|  John| 40|  45|
|     3|    NA| 55|  35|
|     4| David| 60|  29|
|     5|Beckam| 70|  80|
|     0|  Raju| 39|  69|
|     8|Senpai| 45|  99|
|     9|Nanami| 20|  55|
+------+------+---+----+



In [14]:
from pyspark.sql.functions import expr

#importing this since its sql expression as argumentsnot a default in py

In [15]:
new_df = Renamed_df.withColumn("total",expr("phy+chem"))

In [16]:
new_df.show()

+------+------+---+----+-----+
|rollno|  name|phy|chem|total|
+------+------+---+----+-----+
|     1|  Arun| 32|  30|   62|
|     2|  John| 40|  45|   85|
|     3|    NA| 55|  35|   90|
|     4| David| 60|  29|   89|
|     5|Beckam| 70|  80|  150|
|     0|  Raju| 39|  69|  108|
|     8|Senpai| 45|  99|  144|
|     9|Nanami| 20|  55|   75|
+------+------+---+----+-----+



## Knowing how many partitions used and repartitioning it to a single file for writing back to disk

In [18]:
new_df.rdd.getNumPartitions()

2

In [26]:
final_df = new_df.repartition(1)

In [27]:
final_df.rdd.getNumPartitions()

1

### Writing my file to disk (default is json am writing in csvand for compression technique using gzip isntead of deault snappy)

In [32]:
final_df.write \
.format("csv") \
.mode("overwrite") \
.option("header",True) \
.option("comrpession","gzip") \
.option("Path",'/user/itv014119/sample') \
.save()

### Reading the data from disk since I loaded the header on previous writes using it to read header from it, inferschema is not preferable but to showcase i used it now

In [33]:
reading_sample = spark.read \
.format("csv") \
.option("header",True) \
.option("inferSchema",True) \
.load("/user/itv014119/sample")

In [34]:
reading_sample.show()

+------+------+---+----+-----+
|rollno|  name|phy|chem|total|
+------+------+---+----+-----+
|     1|  Arun| 32|  30|   62|
|     2|  John| 40|  45|   85|
|     3|    NA| 55|  35|   90|
|     4| David| 60|  29|   89|
|     5|Beckam| 70|  80|  150|
|     0|  Raju| 39|  69|  108|
|     8|Senpai| 45|  99|  144|
|     9|Nanami| 20|  55|   75|
+------+------+---+----+-----+



### Joints 2 tables customers table and orders table

In [2]:
order_schema = "id long,date string,custid long,status string"

In [3]:
order_df = spark.read \
.format("csv") \
.schema(order_schema) \
.load("/public/trendytech/orders/orders_1gb.csv")

In [5]:
order_df.show(3)

+---+--------------------+------+---------------+
| id|                date|custid|         status|
+---+--------------------+------+---------------+
|  1|2013-07-25 00:00:...| 11599|         CLOSED|
|  2|2013-07-25 00:00:...|   256|PENDING_PAYMENT|
|  3|2013-07-25 00:00:...| 12111|       COMPLETE|
+---+--------------------+------+---------------+
only showing top 3 rows



In [4]:
customer_schema = "custids long,fname string,lname string,username string,pwd string,city string,state string,pincode string"

In [5]:
customer_df = spark.read \
.format("csv") \
.schema(customer_schema) \
.load("/public/trendytech/retail_db/customers")

In [8]:
customer_df.show(3)

+-------+-------+---------+---------+---------+--------------------+-----------+-------+
|custids|  fname|    lname| username|      pwd|                city|      state|pincode|
+-------+-------+---------+---------+---------+--------------------+-----------+-------+
|      1|Richard|Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|Brownsville|     TX|
|      2|   Mary|  Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|  Littleton|     CO|
|      3|    Ann|    Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|     Caguas|     PR|
+-------+-------+---------+---------+---------+--------------------+-----------+-------+
only showing top 3 rows



In [13]:
order_df.distinct().count()

#Wide transformation this has involved 200 shuffle partitions on the backend to get this data

68883

In [15]:
customer_df.distinct().count()

12435

In [16]:
order_df.join(customer_df,order_df.custid == customer_df.custids,"left").show(30)

+---+--------------------+------+---------------+-------+---------+-----------+---------+---------+--------------------+-------------+-------+
| id|                date|custid|         status|custids|    fname|      lname| username|      pwd|                city|        state|pincode|
+---+--------------------+------+---------------+-------+---------+-----------+---------+---------+--------------------+-------------+-------+
|  1|2013-07-25 00:00:...| 11599|         CLOSED|  11599|     Mary|     Malone|XXXXXXXXX|XXXXXXXXX|8708 Indian Horse...|      Hickory|     NC|
|  2|2013-07-25 00:00:...|   256|PENDING_PAYMENT|    256|    David|  Rodriguez|XXXXXXXXX|XXXXXXXXX|7605 Tawny Horse ...|      Chicago|     IL|
|  3|2013-07-25 00:00:...| 12111|       COMPLETE|  12111|    Amber|     Franco|XXXXXXXXX|XXXXXXXXX|8766 Clear Prairi...|   Santa Cruz|     CA|
|  4|2013-07-25 00:00:...|  8827|         CLOSED|   8827|    Brian|     Wilson|XXXXXXXXX|XXXXXXXXX|   8396 High Corners|  San Antonio|     TX|

In [6]:
joined_df = order_df.join(customer_df,order_df.custid == customer_df.custids,"left")

In [18]:
joined_df.rdd.getNumPartitions()

9

In [7]:
joined_df.show(5)

+---+--------------------+------+---------------+-------+-----+---------+---------+---------+--------------------+-----------+-------+
| id|                date|custid|         status|custids|fname|    lname| username|      pwd|                city|      state|pincode|
+---+--------------------+------+---------------+-------+-----+---------+---------+---------+--------------------+-----------+-------+
|  1|2013-07-25 00:00:...| 11599|         CLOSED|  11599| Mary|   Malone|XXXXXXXXX|XXXXXXXXX|8708 Indian Horse...|    Hickory|     NC|
|  2|2013-07-25 00:00:...|   256|PENDING_PAYMENT|    256|David|Rodriguez|XXXXXXXXX|XXXXXXXXX|7605 Tawny Horse ...|    Chicago|     IL|
|  3|2013-07-25 00:00:...| 12111|       COMPLETE|  12111|Amber|   Franco|XXXXXXXXX|XXXXXXXXX|8766 Clear Prairi...| Santa Cruz|     CA|
|  4|2013-07-25 00:00:...|  8827|         CLOSED|   8827|Brian|   Wilson|XXXXXXXXX|XXXXXXXXX|   8396 High Corners|San Antonio|     TX|
|  5|2013-07-25 00:00:...| 11318|       COMPLETE|  1131

In [9]:
joined_df.write \
.mode("overwrite") \
.option("header",True) \
.partitionBy("status") \
.option("path","/user/itv014119/sample/joined") \
.save()