In [1]:
import pyspark 
from pyspark import SparkContext
sc = SparkContext()
spark = pyspark.sql.SparkSession(sc, jsparkSession=None)

In [2]:
spark

In [3]:
#Loading the data into a dataframe

mrkt_df = spark.read.option("inferSchema","true").option("header","true").option("delimiter", "\t").csv("/Users/charithaveeragandham/Downloads/marketing_campaign.csv")

In [4]:
#Total number of columns

print(len(mrkt_df.columns))

29


In [5]:
#printSchema() display schema of a dataframe i.e columns, thier datatypes and whether they are nullable
mrkt_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Year_Birth: integer (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Kidhome: integer (nullable = true)
 |-- Teenhome: integer (nullable = true)
 |-- Dt_Customer: string (nullable = true)
 |-- Recency: integer (nullable = true)
 |-- MntWines: integer (nullable = true)
 |-- MntFruits: integer (nullable = true)
 |-- MntMeatProducts: integer (nullable = true)
 |-- MntFishProducts: integer (nullable = true)
 |-- MntSweetProducts: integer (nullable = true)
 |-- MntGoldProds: integer (nullable = true)
 |-- NumDealsPurchases: integer (nullable = true)
 |-- NumWebPurchases: integer (nullable = true)
 |-- NumCatalogPurchases: integer (nullable = true)
 |-- NumStorePurchases: integer (nullable = true)
 |-- NumWebVisitsMonth: integer (nullable = true)
 |-- AcceptedCmp3: integer (nullable = true)
 |-- AcceptedCmp4: integer (nullable = true)
 |-- AcceptedC

## Defining Schema

In [6]:
from pyspark.sql.types import *

#Sample data creation
data = ([
        (1,"Name1",5000),
        (2,"Name2",7000),
        (3,"Name3",8000),
        (4,"Name4",10000),
        (5,"Name5",9000)
])

#Defining the schema
schema = StructType([
            StructField("ID",IntegerType(),True),
            StructField("Name",StringType(),True),
            StructField("Salary",IntegerType(),True)
])

df = spark.createDataFrame(data,schema)

df.show()

+---+-----+------+
| ID| Name|Salary|
+---+-----+------+
|  1|Name1|  5000|
|  2|Name2|  7000|
|  3|Name3|  8000|
|  4|Name4| 10000|
|  5|Name5|  9000|
+---+-----+------+



# Actions

## show()

In [8]:
df.show()

+---+-----+------+
| ID| Name|Salary|
+---+-----+------+
|  1|Name1|  5000|
|  2|Name2|  7000|
|  3|Name3|  8000|
|  4|Name4| 10000|
|  5|Name5|  9000|
+---+-----+------+



## collect() 

In [9]:
df.collect()

[Row(ID=1, Name='Name1', Salary=5000),
 Row(ID=2, Name='Name2', Salary=7000),
 Row(ID=3, Name='Name3', Salary=8000),
 Row(ID=4, Name='Name4', Salary=10000),
 Row(ID=5, Name='Name5', Salary=9000)]

## take(), first() and head()

In [10]:
df.take(4)

[Row(ID=1, Name='Name1', Salary=5000),
 Row(ID=2, Name='Name2', Salary=7000),
 Row(ID=3, Name='Name3', Salary=8000),
 Row(ID=4, Name='Name4', Salary=10000)]

In [11]:
#Following 3 actions yield the same results

#Result type [row(cname1=val1,cname2=val2)]
df.take(1) 
#Result type row(cname1=val1,cname2=val2)
df.head() 
df.first()


Row(ID=1, Name='Name1', Salary=5000)

## saveAsTextFile()

In [None]:
saveAsTextFile() is an RDD action

In [13]:
#Converting the dataframe to an RDD since this action is not available for a dataframe
df.rdd.saveAsTextFile('saverdd')

In [14]:
import os
os.listdir('saverdd')

['._SUCCESS.crc',
 '.part-00000.crc',
 '.part-00001.crc',
 'part-00002',
 '.part-00003.crc',
 'part-00003',
 '.part-00002.crc',
 '_SUCCESS',
 'part-00001',
 'part-00000']

In [15]:
df.rdd.getNumPartitions()

4

## Writing a dataframe to a file

In [18]:
# A dataframe can be written to a file using write() and save() methods

#Both of these statament perform the same operation
#df.write.csv("savedf")
df.write.format("csv").save("savedf")

In [19]:
os.listdir('savedf')

['._SUCCESS.crc',
 '.part-00000-6dcbef47-26f8-4e2e-8140-ab8d15d24f9a-c000.csv.crc',
 'part-00003-6dcbef47-26f8-4e2e-8140-ab8d15d24f9a-c000.csv',
 '.part-00001-6dcbef47-26f8-4e2e-8140-ab8d15d24f9a-c000.csv.crc',
 '.part-00003-6dcbef47-26f8-4e2e-8140-ab8d15d24f9a-c000.csv.crc',
 '.part-00002-6dcbef47-26f8-4e2e-8140-ab8d15d24f9a-c000.csv.crc',
 'part-00002-6dcbef47-26f8-4e2e-8140-ab8d15d24f9a-c000.csv',
 '_SUCCESS',
 'part-00001-6dcbef47-26f8-4e2e-8140-ab8d15d24f9a-c000.csv',
 'part-00000-6dcbef47-26f8-4e2e-8140-ab8d15d24f9a-c000.csv']

# Transformations

## select

In [20]:
#Example of select

columns_subset1 = ['ID','Year_Birth','Marital_Status','Income','Kidhome','Teenhome','Recency']

select_df = mrkt_df.select(columns_subset1)

select_df.show(n=10)

+----+----------+--------------+------+-------+--------+-------+
|  ID|Year_Birth|Marital_Status|Income|Kidhome|Teenhome|Recency|
+----+----------+--------------+------+-------+--------+-------+
|5524|      1957|        Single| 58138|      0|       0|     58|
|2174|      1954|        Single| 46344|      1|       1|     38|
|4141|      1965|      Together| 71613|      0|       0|     26|
|6182|      1984|      Together| 26646|      1|       0|     26|
|5324|      1981|       Married| 58293|      1|       0|     94|
|7446|      1967|      Together| 62513|      0|       1|     16|
| 965|      1971|      Divorced| 55635|      0|       1|     34|
|6177|      1985|       Married| 33454|      1|       0|     32|
|4855|      1974|      Together| 30351|      1|       0|     19|
|5899|      1950|      Together|  5648|      1|       1|     68|
+----+----------+--------------+------+-------+--------+-------+
only showing top 10 rows



## selectExpr

In [21]:
#Example of selectExpr()

expr_df = select_df.selectExpr("*",("case when Income<50000 then 'low' else 'high' end as Income_bracket"))
expr_df.show(n=10)

#Changing col name using selectExpr()
expr_df.selectExpr("Income_bracket as Income_Range").show(n=10)

#Changing column type - Income was initially of type Int
expr_df.selectExpr("cast(Income as String)").printSchema()

+----+----------+--------------+------+-------+--------+-------+--------------+
|  ID|Year_Birth|Marital_Status|Income|Kidhome|Teenhome|Recency|Income_bracket|
+----+----------+--------------+------+-------+--------+-------+--------------+
|5524|      1957|        Single| 58138|      0|       0|     58|          high|
|2174|      1954|        Single| 46344|      1|       1|     38|           low|
|4141|      1965|      Together| 71613|      0|       0|     26|          high|
|6182|      1984|      Together| 26646|      1|       0|     26|           low|
|5324|      1981|       Married| 58293|      1|       0|     94|          high|
|7446|      1967|      Together| 62513|      0|       1|     16|          high|
| 965|      1971|      Divorced| 55635|      0|       1|     34|          high|
|6177|      1985|       Married| 33454|      1|       0|     32|           low|
|4855|      1974|      Together| 30351|      1|       0|     19|           low|
|5899|      1950|      Together|  5648| 

## map() vs flatMap()

In [22]:
#Example of map()

age_df=expr_df.rdd.map(lambda x: (x.ID,2022-x.Year_Birth)).toDF().selectExpr("_1 as ID","_2 as Age")

age_df.show()

+----+---+
|  ID|Age|
+----+---+
|5524| 65|
|2174| 68|
|4141| 57|
|6182| 38|
|5324| 41|
|7446| 55|
| 965| 51|
|6177| 37|
|4855| 48|
|5899| 72|
|1994| 39|
| 387| 46|
|2125| 63|
|8180| 70|
|2569| 35|
|2114| 76|
|9736| 42|
|4939| 76|
|6565| 73|
|2278| 37|
+----+---+
only showing top 20 rows



In [23]:
# To explain flatMap(), let us implement the word count problem

text = "This is a word count program. We first split the file and assign a value of one for each word. A groupBy is applied to count to number of times a word is repeated"

text = sc.textFile("file.txt")

text.flatMap(lambda x: x.split(" ")).map(lambda x : (x,1)).toDF().groupBy("_1").count().show()

+---------+-----+
|       _1|count|
+---------+-----+
|   steps.|    1|
|     bar.|    1|
|  appears|    1|
| document|    1|
|      you|    1|
|      for|    1|
|    Pages|    2|
|     Open|    1|
|    these|    1|
|     your|    2|
|   bottom|    1|
|       on|    1|
|document,|    1|
|   count.|    1|
|     View|    1|
|     Show|    1|
|      the|    6|
|     want|    1|
|      box|    1|
|      see|    2|
+---------+-----+
only showing top 20 rows



## filter() and where()

In [24]:
#Filter() and where() transformations are used to filter rows based on the condition given

expr_df.filter(expr_df.Income_bracket == "high").show(10)

expr_df.where(expr_df.Income_bracket == "high").show(10)

# As you can see both of these transformations perform the same operation

+----+----------+--------------+------+-------+--------+-------+--------------+
|  ID|Year_Birth|Marital_Status|Income|Kidhome|Teenhome|Recency|Income_bracket|
+----+----------+--------------+------+-------+--------+-------+--------------+
|5524|      1957|        Single| 58138|      0|       0|     58|          high|
|4141|      1965|      Together| 71613|      0|       0|     26|          high|
|5324|      1981|       Married| 58293|      1|       0|     94|          high|
|7446|      1967|      Together| 62513|      0|       1|     16|          high|
| 965|      1971|      Divorced| 55635|      0|       1|     34|          high|
|1994|      1983|       Married|  null|      1|       0|     11|          high|
|2125|      1959|      Divorced| 63033|      0|       0|     82|          high|
|8180|      1952|      Divorced| 59354|      1|       1|     53|          high|
|2114|      1946|        Single| 82800|      0|       0|     23|          high|
|6565|      1949|       Married| 76995| 

## withColumn

In [25]:
from pyspark.sql.functions import col
from pyspark.sql.functions import lit

#Create a new columns 
expr_df.withColumn("Age",2022-col("Year_Birth")).show(n=10)

#Rename a column
expr_df.withColumnRenamed("Income_bracket","Income_range").show(n=5)

#Adding a new column using withColumn and lit
expr_df.withColumn("Use-case",lit("Tutorial")).show(n=5)

#Updating an existing column value
expr_df.withColumn("Recency",col("Recency")-30).show(n=5)

#Changing datatype of a column
expr_df.select("Year_Birth").printSchema()
expr_df.withColumn("Year_Birth",col("Year_Birth").cast("string")).select("Year_Birth").printSchema()
expr_df.select("Year_Birth").printSchema()

#As you see here, the second expr_df.select("Year_Birth").printSchema() statement shows you that the dataframes are immutable
#In order to store these changes, a new dataframe should be created

+----+----------+--------------+------+-------+--------+-------+--------------+---+
|  ID|Year_Birth|Marital_Status|Income|Kidhome|Teenhome|Recency|Income_bracket|Age|
+----+----------+--------------+------+-------+--------+-------+--------------+---+
|5524|      1957|        Single| 58138|      0|       0|     58|          high| 65|
|2174|      1954|        Single| 46344|      1|       1|     38|           low| 68|
|4141|      1965|      Together| 71613|      0|       0|     26|          high| 57|
|6182|      1984|      Together| 26646|      1|       0|     26|           low| 38|
|5324|      1981|       Married| 58293|      1|       0|     94|          high| 41|
|7446|      1967|      Together| 62513|      0|       1|     16|          high| 55|
| 965|      1971|      Divorced| 55635|      0|       1|     34|          high| 51|
|6177|      1985|       Married| 33454|      1|       0|     32|           low| 37|
|4855|      1974|      Together| 30351|      1|       0|     19|           l

## Coalesce() and Repartition()

In [26]:
#Example of repartition()

expr_df.rdd.getNumPartitions()

expr_df.rdd.repartition(2).getNumPartitions()


2

In [27]:
#Example of coalesce()

expr_df.rdd.coalesce(1).getNumPartitions()

1

## join()

In [28]:
age_df.printSchema()
expr_df.join(age_df, age_df.ID==expr_df.ID,how="inner").show()

root
 |-- ID: long (nullable = true)
 |-- Age: long (nullable = true)

+----+----------+--------------+------+-------+--------+-------+--------------+----+---+
|  ID|Year_Birth|Marital_Status|Income|Kidhome|Teenhome|Recency|Income_bracket|  ID|Age|
+----+----------+--------------+------+-------+--------+-------+--------------+----+---+
|5524|      1957|        Single| 58138|      0|       0|     58|          high|5524| 65|
|2174|      1954|        Single| 46344|      1|       1|     38|           low|2174| 68|
|4141|      1965|      Together| 71613|      0|       0|     26|          high|4141| 57|
|6182|      1984|      Together| 26646|      1|       0|     26|           low|6182| 38|
|5324|      1981|       Married| 58293|      1|       0|     94|          high|5324| 41|
|7446|      1967|      Together| 62513|      0|       1|     16|          high|7446| 55|
| 965|      1971|      Divorced| 55635|      0|       1|     34|          high| 965| 51|
|6177|      1985|       Married| 33454|