In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc

In [2]:
spark = SparkSession.builder.appName("DataFrame-Demo").getOrCreate()

In [5]:
# Read CSV file into DataFrame

In [18]:
df = spark.read.option("delimiter",";").csv("./data.csv",header=True,inferSchema=True)
# option is used for ; in csv
# inferSchema is automatic finds the correct schema

In [19]:
# Display schema of DataFrame
df.printSchema()

#Display content
df.show(5)

root
 |-- DistributorCode: integer (nullable = true)
 |-- BranchCode: string (nullable = true)
 |-- SupervisorCode: string (nullable = true)
 |-- SupervisorName: string (nullable = true)
 |-- SalesRepCode: string (nullable = true)
 |-- SalesRepName: string (nullable = true)
 |-- SellerType: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Mobileno: string (nullable = true)
 |-- EmailID: string (nullable = true)
 |-- LoginID: string (nullable = true)
 |-- Password: string (nullable = true)

+---------------+----------+--------------+--------------+------------+--------------------+----------+------+-------+--------+-------+---------------+--------+
|DistributorCode|BranchCode|SupervisorCode|SupervisorName|SalesRepCode|        SalesRepName|SellerType|Gender|Address|Mobileno|EmailID|        LoginID|Password|
+---------------+----------+--------------+--------------+------------+--------------------+----------+------+-------+-------

In [39]:
# Select: Choose specific column
selected_column = df.select("BranchCode","SalesRepName")
selected_column.show(10)

+----------+--------------------+
|BranchCode|        SalesRepName|
+----------+--------------------+
|     NGAN1|VSDL1-TRAN THI HA...|
|     NGAN1|  VSDL2-THAI DINH LY|
|     NGAN1|VSDL3-TRAN THI TU...|
|     NGAN1|VSDL4-THAI DINH TUAN|
|     NGAN1|VSDL5-NGUYEN THAI...|
|     NGAN1|VSDL7-NGUYEN CONG AU|
|     NGAN1|VSDL8-NGUYEN THI ...|
|     NGAN1|VS003-PHAN TIEN DUNG|
|     NGAN1|VS005-NGUYEN VAN ...|
|     NGAN1|VS006-NGUYEN HUU ...|
+----------+--------------------+
only showing top 10 rows



In [27]:
# Filter row base on a condition
filtered_data = df.filter(df.SalesRepCode.like("VS00%"))
filtered_data.show()

+---------------+----------+--------------+----------------+------------+--------------------+----------+------+-------+--------+-------+---------------+--------+
|DistributorCode|BranchCode|SupervisorCode|  SupervisorName|SalesRepCode|        SalesRepName|SellerType|Gender|Address|Mobileno|EmailID|        LoginID|Password|
+---------------+----------+--------------+----------------+------------+--------------------+----------+------+-------+--------+-------+---------------+--------+
|     2001584413|     NGAN1|       HUNG.NB| NGUYEN BUI HUNG|       VS003|VS003-PHAN TIEN DUNG|      NULL|  NULL|   NULL|    NULL|   NULL|DNANGNGAN1VS003|   vs003|
|     2001584413|     NGAN1|       HUNG.NB| NGUYEN BUI HUNG|       VS005|VS005-NGUYEN VAN ...|      NULL|  NULL|   NULL|    NULL|   NULL|DNANGNGAN1VS005|   vs005|
|     2001584413|     NGAN1|       HUNG.NB| NGUYEN BUI HUNG|       VS006|VS006-NGUYEN HUU ...|      NULL|  NULL|   NULL|    NULL|   NULL|DNANGNGAN1VS006|   vs006|
|     2001584413|     

In [78]:
# Group data by count, avg, sum
# Rename columns for better readable
grouped_data = df.groupBy("SupervisorCode").agg({"SalesRepCode":"count"}) \
    .withColumnRenamed("count(SalesRepCode)","Total")
grouped_data.show()

+--------------+-----+
|SupervisorCode|Total|
+--------------+-----+
|       HUNG.NB|    6|
|          NULL|    1|
|       TUAN.NV|    6|
|         HA.ND|    7|
|       MINH.NN|    6|
+--------------+-----+



In [76]:
# Sort by a column
# Rename columns for better readable
sorted_data = grouped_data.orderBy("count(SalesRepCode)",ascending=False) \
    .withColumnRenamed("count(SalesRepCode)","Total")
sorted_data.show()

+--------------+-----+
|SupervisorCode|Total|
+--------------+-----+
|         HA.ND|    7|
|       HUNG.NB|    6|
|       TUAN.NV|    6|
|       MINH.NN|    6|
|          NULL|    1|
+--------------+-----+



In [79]:
# Sort by multiple columns
from pyspark.sql.functions import col,desc
sorted_datas = grouped_data.orderBy(col("count(SalesRepCode)").desc(),col("SupervisorCode").desc())
sorted_datas.show()

+--------------+-----+
|SupervisorCode|Total|
+--------------+-----+
|         HA.ND|    7|
|       TUAN.NV|    6|
|       MINH.NN|    6|
|       HUNG.NB|    6|
|          NULL|    1|
+--------------+-----+



In [50]:
# Create a new df2 and join with df
df2 = df.select("BranchCode","SalesRepName").limit(2)

joined_data = df.limit(3).join(df2,"BranchCode")
joined_data.show()

+----------+---------------+--------------+--------------+------------+--------------------+----------+------+-------+--------+-------+---------------+--------+--------------------+
|BranchCode|DistributorCode|SupervisorCode|SupervisorName|SalesRepCode|        SalesRepName|SellerType|Gender|Address|Mobileno|EmailID|        LoginID|Password|        SalesRepName|
+----------+---------------+--------------+--------------+------------+--------------------+----------+------+-------+--------+-------+---------------+--------+--------------------+
|     NGAN1|     2001584413|         HA.ND|NGUYEN DINH HA|       VSDL1|VSDL1-TRAN THI HA...|      NULL|  NULL|   NULL|    NULL|   NULL|DNANGNGAN1VSDL1|   vsdl1|  VSDL2-THAI DINH LY|
|     NGAN1|     2001584413|         HA.ND|NGUYEN DINH HA|       VSDL1|VSDL1-TRAN THI HA...|      NULL|  NULL|   NULL|    NULL|   NULL|DNANGNGAN1VSDL1|   vsdl1|VSDL1-TRAN THI HA...|
|     NGAN1|     2001584413|         HA.ND|NGUYEN DINH HA|       VSDL2|  VSDL2-THAI DINH L

In [63]:
# Get distinct SupervisorCode
distinct_rows = df.select("SupervisorCode").distinct()
distinct_rows.show()

+--------------+
|SupervisorCode|
+--------------+
|       HUNG.NB|
|       TUAN.NV|
|         HA.ND|
|       MINH.NN|
|          NULL|
+--------------+



In [66]:
# Drop: Remove specified columns
dropped_columns = df.drop("SellerType","Gender","Address","Mobileno","EmailID")
dropped_columns.show(10)

+---------------+----------+--------------+---------------+------------+--------------------+---------------+--------+
|DistributorCode|BranchCode|SupervisorCode| SupervisorName|SalesRepCode|        SalesRepName|        LoginID|Password|
+---------------+----------+--------------+---------------+------------+--------------------+---------------+--------+
|     2001584413|     NGAN1|         HA.ND| NGUYEN DINH HA|       VSDL1|VSDL1-TRAN THI HA...|DNANGNGAN1VSDL1|   vsdl1|
|     2001584413|     NGAN1|         HA.ND| NGUYEN DINH HA|       VSDL2|  VSDL2-THAI DINH LY|DNANGNGAN1VSDL2|   vsdl2|
|     2001584413|     NGAN1|         HA.ND| NGUYEN DINH HA|       VSDL3|VSDL3-TRAN THI TU...|DNANGNGAN1VSDL3|   vsdl3|
|     2001584413|     NGAN1|         HA.ND| NGUYEN DINH HA|       VSDL4|VSDL4-THAI DINH TUAN|DNANGNGAN1VSDL4|   vsdl4|
|     2001584413|     NGAN1|         HA.ND| NGUYEN DINH HA|       VSDL5|VSDL5-NGUYEN THAI...|DNANGNGAN1VSDL5|   vsdl5|
|     2001584413|     NGAN1|         HA.ND| NGUY

In [75]:
# WithColumn: Add new calculated columns
from pyspark.sql.functions import concat, lit
# concat is concatenating 2 columns. The lit function adds a literal space between the two concatenated columns.
df_with_new_column = dropped_columns.withColumn("Full Info", concat(col("SupervisorCode"), lit(" "), col("SupervisorName")))
df_with_new_column.show()

+---------------+----------+--------------+----------------+------------+--------------------+---------------+--------+--------------------+
|DistributorCode|BranchCode|SupervisorCode|  SupervisorName|SalesRepCode|        SalesRepName|        LoginID|Password|           Full Info|
+---------------+----------+--------------+----------------+------------+--------------------+---------------+--------+--------------------+
|     2001584413|     NGAN1|         HA.ND|  NGUYEN DINH HA|       VSDL1|VSDL1-TRAN THI HA...|DNANGNGAN1VSDL1|   vsdl1|HA.ND-NGUYEN DINH HA|
|     2001584413|     NGAN1|         HA.ND|  NGUYEN DINH HA|       VSDL2|  VSDL2-THAI DINH LY|DNANGNGAN1VSDL2|   vsdl2|HA.ND-NGUYEN DINH HA|
|     2001584413|     NGAN1|         HA.ND|  NGUYEN DINH HA|       VSDL3|VSDL3-TRAN THI TU...|DNANGNGAN1VSDL3|   vsdl3|HA.ND-NGUYEN DINH HA|
|     2001584413|     NGAN1|         HA.ND|  NGUYEN DINH HA|       VSDL4|VSDL4-THAI DINH TUAN|DNANGNGAN1VSDL4|   vsdl4|HA.ND-NGUYEN DINH HA|
|     2001584