In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("basic") \
    .master("local[*]") \
    .getOrCreate()

In [3]:
df = spark.read.csv("D:\Business Analytics\Data Visualization\Power BI\Power BI DAX\Section 1\Customers.csv", header=True, inferSchema=True)

In [4]:
df.limit(5).show()

+-----------+------+---------+--------+---------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+
|CustomerKey|Prefix|FirstName|LastName|BirthDate|MaritalStatus|Gender|        EmailAddress|AnnualIncome|TotalChildren|EducationLevel|  Occupation|HomeOwner|
+-----------+------+---------+--------+---------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+
|      11000|   MR.|    PAVAN| LALWANI| 4/8/1996|            M|     M|jon24@adventure-w...|    $90,000 |            2|     Bachelors|Professional|        Y|
|      11001|   MR.|   EUGENE|   HUANG|5/14/1965|            S|     M|eugene10@adventur...|    $60,000 |            3|     Bachelors|Professional|        N|
|      11002|   MR.|    RUBEN|  TORRES|8/12/1965|            M|     M|ruben35@adventure...|    $60,000 |            3|     Bachelors|Professional|        Y|
|      11003|   MS.|  CHRISTY|     ZHU|2/15/1968|         

In [5]:
from pyspark.sql.functions import *

In [6]:
from pyspark.sql.types import *

In [7]:
#Change the datatype of BirthDate to Date

df_new = df.withColumn("DOB", col("BirthDate").cast(DateType()))

In [8]:
df_new.limit(5).show()

+-----------+------+---------+--------+---------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+----+
|CustomerKey|Prefix|FirstName|LastName|BirthDate|MaritalStatus|Gender|        EmailAddress|AnnualIncome|TotalChildren|EducationLevel|  Occupation|HomeOwner| DOB|
+-----------+------+---------+--------+---------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+----+
|      11000|   MR.|    PAVAN| LALWANI| 4/8/1996|            M|     M|jon24@adventure-w...|    $90,000 |            2|     Bachelors|Professional|        Y|NULL|
|      11001|   MR.|   EUGENE|   HUANG|5/14/1965|            S|     M|eugene10@adventur...|    $60,000 |            3|     Bachelors|Professional|        N|NULL|
|      11002|   MR.|    RUBEN|  TORRES|8/12/1965|            M|     M|ruben35@adventure...|    $60,000 |            3|     Bachelors|Professional|        Y|NULL|
|      11003|   MS.|  CHRIST

In [9]:
df_new = df.withColumn("DOB", to_date(col("BirthDate"), "M/d/yyyy"))

In [10]:
df_new.limit(5).show()

+-----------+------+---------+--------+---------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+----------+
|CustomerKey|Prefix|FirstName|LastName|BirthDate|MaritalStatus|Gender|        EmailAddress|AnnualIncome|TotalChildren|EducationLevel|  Occupation|HomeOwner|       DOB|
+-----------+------+---------+--------+---------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+----------+
|      11000|   MR.|    PAVAN| LALWANI| 4/8/1996|            M|     M|jon24@adventure-w...|    $90,000 |            2|     Bachelors|Professional|        Y|1996-04-08|
|      11001|   MR.|   EUGENE|   HUANG|5/14/1965|            S|     M|eugene10@adventur...|    $60,000 |            3|     Bachelors|Professional|        N|1965-05-14|
|      11002|   MR.|    RUBEN|  TORRES|8/12/1965|            M|     M|ruben35@adventure...|    $60,000 |            3|     Bachelors|Professional|        Y|1965

In [11]:
df_new.printSchema()

root
 |-- CustomerKey: integer (nullable = true)
 |-- Prefix: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- BirthDate: string (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- EmailAddress: string (nullable = true)
 |-- AnnualIncome: string (nullable = true)
 |-- TotalChildren: integer (nullable = true)
 |-- EducationLevel: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- HomeOwner: string (nullable = true)
 |-- DOB: date (nullable = true)



In [12]:
df_new = df_new.drop("BirthDate")

In [13]:
df_new.limit(5).show()

+-----------+------+---------+--------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+----------+
|CustomerKey|Prefix|FirstName|LastName|MaritalStatus|Gender|        EmailAddress|AnnualIncome|TotalChildren|EducationLevel|  Occupation|HomeOwner|       DOB|
+-----------+------+---------+--------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+----------+
|      11000|   MR.|    PAVAN| LALWANI|            M|     M|jon24@adventure-w...|    $90,000 |            2|     Bachelors|Professional|        Y|1996-04-08|
|      11001|   MR.|   EUGENE|   HUANG|            S|     M|eugene10@adventur...|    $60,000 |            3|     Bachelors|Professional|        N|1965-05-14|
|      11002|   MR.|    RUBEN|  TORRES|            M|     M|ruben35@adventure...|    $60,000 |            3|     Bachelors|Professional|        Y|1965-08-12|
|      11003|   MS.|  CHRISTY|     ZHU|            S

In [14]:
df_new.select(col("MaritalStatus")).distinct().show()

+-------------+
|MaritalStatus|
+-------------+
|            M|
|            S|
+-------------+



In [15]:
df_new = df_new.withColumn("MaritalStatus", when(col("MaritalStatus") == "M", "Married").otherwise("Single"))

In [16]:
from pyspark.sql.functions import regexp_replace

In [17]:
df_new = df_new.withColumn("AnnualIncome", regexp_replace("AnnualIncome", "[$,]", "").cast('int'))

In [18]:
df_new.printSchema()

root
 |-- CustomerKey: integer (nullable = true)
 |-- Prefix: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- MaritalStatus: string (nullable = false)
 |-- Gender: string (nullable = true)
 |-- EmailAddress: string (nullable = true)
 |-- AnnualIncome: integer (nullable = true)
 |-- TotalChildren: integer (nullable = true)
 |-- EducationLevel: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- HomeOwner: string (nullable = true)
 |-- DOB: date (nullable = true)



In [19]:
group_by_col = ["MaritalStatus","Occupation","EducationLevel"]

In [20]:
df_new.groupBy(group_by_col)\
    .agg(
        countDistinct("CustomerKey").alias("Total Customer"),
        sum("AnnualIncome").alias("Total Income")
    ).show()

+-------------+--------------+-------------------+--------------+------------+
|MaritalStatus|    Occupation|     EducationLevel|Total Customer|Total Income|
+-------------+--------------+-------------------+--------------+------------+
|       Single|        Manual|          Bachelors|            29|      330000|
|      Married|      Clerical|    Graduate Degree|           288|     9120000|
|      Married|  Professional|          Bachelors|           814|    56430000|
|       Single|        Manual|        High School|           655|    12680000|
|       Single|    Management|          Bachelors|           575|    51340000|
|      Married|Skilled Manual|Partial High School|           207|    13300000|
|       Single|      Clerical|    Partial College|           729|    24390000|
|       Single|    Management|        High School|           105|    11400000|
|      Married|    Management|    Graduate Degree|           628|    54960000|
|       Single|Skilled Manual|Partial High School|  

In [21]:
df_new.groupBy("Occupation")\
    .agg(
        countDistinct("CustomerKey").alias("Total Customer"),
        sum("AnnualIncome").alias("Total Income")
    ).sort(desc("Total Income")).show()

+--------------+--------------+------------+
|    Occupation|Total Customer|Total Income|
+--------------+--------------+------------+
|  Professional|          5424|   402280000|
|    Management|          3011|   277670000|
|Skilled Manual|          4501|   232850000|
|      Clerical|          2859|    87760000|
|        Manual|          2353|    38760000|
+--------------+--------------+------------+



In [22]:
df_new.createTempView("customer")

In [30]:
sql_result = spark.sql("""Select * from {tab} LIMIT {lim}""".format(tab = "customer", lim = 5))

In [31]:
sql_result.show()

+-----------+------+---------+--------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+----------+
|CustomerKey|Prefix|FirstName|LastName|MaritalStatus|Gender|        EmailAddress|AnnualIncome|TotalChildren|EducationLevel|  Occupation|HomeOwner|       DOB|
+-----------+------+---------+--------+-------------+------+--------------------+------------+-------------+--------------+------------+---------+----------+
|      11000|   MR.|    PAVAN| LALWANI|      Married|     M|jon24@adventure-w...|       90000|            2|     Bachelors|Professional|        Y|1996-04-08|
|      11001|   MR.|   EUGENE|   HUANG|       Single|     M|eugene10@adventur...|       60000|            3|     Bachelors|Professional|        N|1965-05-14|
|      11002|   MR.|    RUBEN|  TORRES|      Married|     M|ruben35@adventure...|       60000|            3|     Bachelors|Professional|        Y|1965-08-12|
|      11003|   MS.|  CHRISTY|     ZHU|       Single