##### File reading

In [187]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("file_read").getOrCreate()

df = spark.read.format('csv')\
               .option("inferSchema",True)\
               .option("header",True)\
               .load("C:/Git files/My git files/PySpark/files/test.csv")
df.show(2)

+-----+---------------+----------+---------+---------------+-----------------+--------+------------+----------------+--------------------+-----------------+--------------------+
|Index|    Customer Id|First Name|Last Name|        Company|             City| Country|     Phone 1|         Phone 2|               Email|Subscription Date|             Website|
+-----+---------------+----------+---------+---------------+-----------------+--------+------------+----------------+--------------------+-----------------+--------------------+
|    1|DD37Cf93aecA6Dc|    Sheryl|   Baxter|Rasmussen Group|     East Leonard|   Chile|229.077.5154|397.884.0519x718|zunigavanessa@smi...|       2020-08-24|http://www.stephe...|
|    2|1Ef7b82A4CAAD10|   Preston|   Lozano|    Vega-Gentry|East Jimmychester|Djibouti|  5153435776|686-620-1820x944|     vmata@colon.com|       2021-04-23|http://www.hobbs....|
+-----+---------------+----------+---------+---------------+-----------------+--------+------------+----------

#### schema

In [188]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("file_read").getOrCreate()


sales_data = [
    {"sale_id": 1,  "date": "2025-01-03", "customer": "Arun",     "product": "Laptop",      "amount": 56000},
    {"sale_id": 2,  "date": "2025-01-04", "customer": "Meena",    "product": "Mouse",       "amount": 800},
    {"sale_id": 3,  "date": "2025-01-05", "customer": "John",     "product": "Keyboard",    "amount": 1500},
    {"sale_id": 4,  "date": "2025-01-06", "customer": "Priya",    "product": "Monitor",     "amount": 7200},
    {"sale_id": 5,  "date": "2025-01-07", "customer": "Sneha",    "product": "Laptop",      "amount": 53000},
    {"sale_id": 6,  "date": "2025-01-08", "customer": "Kiran",    "product": "Tablet",      "amount": 18000},
    {"sale_id": 7,  "date": "2025-01-09", "customer": "Rahul",    "product": "Earphones",   "amount": 1200},
    {"sale_id": 8,  "date": "2025-01-10", "customer": "Vijay",    "product": "Speaker",     "amount": 2500},
    {"sale_id": 9,  "date": "2025-01-11", "customer": "Anjali",   "product": "Webcam",      "amount": 3000},
    {"sale_id": 10, "date": "2025-01-12", "customer": "Nitin",    "product": "Printer",     "amount": 9000},
    {"sale_id": 11, "date": "2025-01-13", "customer": "Divya",    "product": "Laptop",      "amount": 59000},
    {"sale_id": 12, "date": "2025-01-14", "customer": "Harish",   "product": "Mouse",       "amount": 850},
    {"sale_id": 13, "date": "2025-01-15", "customer": "Suman",    "product": "Keyboard",    "amount": 1600},
    {"sale_id": 14, "date": "2025-01-16", "customer": "Geeta",    "product": "Monitor",     "amount": 7100},
    {"sale_id": 15, "date": "2025-01-17", "customer": "Mahesh",   "product": "Tablet",      "amount": 17500},
    {"sale_id": 16, "date": "2025-01-18", "customer": "Asha",     "product": "Speaker",     "amount": 2600},
    {"sale_id": 17, "date": "2025-01-19", "customer": "Lokesh",   "product": "Laptop",      "amount": 60000},
    {"sale_id": 18, "date": "2025-01-20", "customer": "Tarun",    "product": "Webcam",      "amount": 2800},
    {"sale_id": 19, "date": "2025-01-21", "customer": "Gauri",    "product": "Earphones",   "amount": 1100},
    {"sale_id": 20, "date": "2025-01-22", "customer": "Sathish",  "product": "Printer",     "amount": 9500}
]

df2 = spark.createDataFrame(data=sales_data)
df2.show(4)

+------+--------+----------+--------+-------+
|amount|customer|      date| product|sale_id|
+------+--------+----------+--------+-------+
| 56000|    Arun|2025-01-03|  Laptop|      1|
|   800|   Meena|2025-01-04|   Mouse|      2|
|  1500|    John|2025-01-05|Keyboard|      3|
|  7200|   Priya|2025-01-06| Monitor|      4|
+------+--------+----------+--------+-------+
only showing top 4 rows



In [189]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("file_read").getOrCreate()
my_struct_schema = StructType([
    StructField("sale_id", IntegerType(), True),
    StructField("date", DateType(), True),
    StructField("customer", StringType(), True),
    StructField("product", StringType(), True),
    StructField("amount", DoubleType(), True)
    
])
print("Struct_schema")
df2 = spark.read.format('csv')\
                .schema(my_struct_schema)\
                .option('header',True)\
                .load("C:/Git files/My git files/PySpark/files/sales.csv")

df2.printSchema()

my_ddl_schema = """
sale_id INT,
date STRING,
customer STRING,
product STRING,
amount DOUBLE
"""

print("ddl_scehma")
df1 = spark.read.format('csv')\
                .schema(my_ddl_schema)\
                .option('header',True)\
                .load("C:/Git files/My git files/PySpark/files/sales.csv")

df1.printSchema()


Struct_schema
root
 |-- sale_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- customer: string (nullable = true)
 |-- product: string (nullable = true)
 |-- amount: double (nullable = true)

ddl_scehma
root
 |-- sale_id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- customer: string (nullable = true)
 |-- product: string (nullable = true)
 |-- amount: double (nullable = true)



#### select

In [190]:
df2.select('customer','amount').show(3)

+--------+-------+
|customer| amount|
+--------+-------+
|    Arun|56000.0|
|   Meena|  800.0|
|    John| 1500.0|
+--------+-------+
only showing top 3 rows



#### alias

In [191]:
df2.select(col("customer").alias("customer_name")).show(4)

+-------------+
|customer_name|
+-------------+
|         Arun|
|        Meena|
|         John|
|        Priya|
+-------------+
only showing top 4 rows



#### filter

In [192]:
df2.filter(col("customer").startswith('A')).show()

+-------+----------+--------+-------+-------+
|sale_id|      date|customer|product| amount|
+-------+----------+--------+-------+-------+
|      1|2025-01-03|    Arun| Laptop|56000.0|
|      9|2025-01-11|  Anjali| Webcam| 3000.0|
|     16|2025-01-18|    Asha|Speaker| 2600.0|
+-------+----------+--------+-------+-------+



#### withColumnRenamed

In [193]:
df1 = df1.withColumnRenamed("customer","customer_name")
df1.show(5)

+-------+----------+-------------+--------+-------+
|sale_id|      date|customer_name| product| amount|
+-------+----------+-------------+--------+-------+
|      1|2025-01-03|         Arun|  Laptop|56000.0|
|      2|2025-01-04|        Meena|   Mouse|  800.0|
|      3|2025-01-05|         John|Keyboard| 1500.0|
|      4|2025-01-06|        Priya| Monitor| 7200.0|
|      5|2025-01-07|        Sneha|  Laptop|53000.0|
+-------+----------+-------------+--------+-------+
only showing top 5 rows



#### Typecasting()

In [194]:
df1 = df1.withColumn("date",col("date").cast(DateType()))
df1.printSchema()

root
 |-- sale_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- product: string (nullable = true)
 |-- amount: double (nullable = true)



#### sort

In [195]:
df1.sort(col("customer_name").desc()).show(4)
df1.sort(["sale_id","customer_name"],ascending=[0,1]).show(4)

+-------+----------+-------------+--------+-------+
|sale_id|      date|customer_name| product| amount|
+-------+----------+-------------+--------+-------+
|      8|2025-01-10|        Vijay| Speaker| 2500.0|
|     18|2025-01-20|        Tarun|  Webcam| 2800.0|
|     13|2025-01-15|        Suman|Keyboard| 1600.0|
|      5|2025-01-07|        Sneha|  Laptop|53000.0|
+-------+----------+-------------+--------+-------+
only showing top 4 rows

+-------+----------+-------------+---------+-------+
|sale_id|      date|customer_name|  product| amount|
+-------+----------+-------------+---------+-------+
|     20|2025-01-22|      Sathish|  Printer| 9500.0|
|     19|2025-01-21|        Gauri|Earphones| 1100.0|
|     18|2025-01-20|        Tarun|   Webcam| 2800.0|
|     17|2025-01-19|       Lokesh|   Laptop|60000.0|
+-------+----------+-------------+---------+-------+
only showing top 4 rows



#### limit

In [196]:
df1.limit(5).show()

+-------+----------+-------------+--------+-------+
|sale_id|      date|customer_name| product| amount|
+-------+----------+-------------+--------+-------+
|      1|2025-01-03|         Arun|  Laptop|56000.0|
|      2|2025-01-04|        Meena|   Mouse|  800.0|
|      3|2025-01-05|         John|Keyboard| 1500.0|
|      4|2025-01-06|        Priya| Monitor| 7200.0|
|      5|2025-01-07|        Sneha|  Laptop|53000.0|
+-------+----------+-------------+--------+-------+



#### Drop

In [197]:
df1.drop(col("product")).show(4)
df1.drop("product","amount").show(4)

+-------+----------+-------------+-------+
|sale_id|      date|customer_name| amount|
+-------+----------+-------------+-------+
|      1|2025-01-03|         Arun|56000.0|
|      2|2025-01-04|        Meena|  800.0|
|      3|2025-01-05|         John| 1500.0|
|      4|2025-01-06|        Priya| 7200.0|
+-------+----------+-------------+-------+
only showing top 4 rows

+-------+----------+-------------+
|sale_id|      date|customer_name|
+-------+----------+-------------+
|      1|2025-01-03|         Arun|
|      2|2025-01-04|        Meena|
|      3|2025-01-05|         John|
|      4|2025-01-06|        Priya|
+-------+----------+-------------+
only showing top 4 rows



#### drop_duplicates

In [198]:
df1.dropDuplicates().show(4)
df1.dropDuplicates(subset=["customer_name","product"]).show(4)

+-------+----------+-------------+---------+-------+
|sale_id|      date|customer_name|  product| amount|
+-------+----------+-------------+---------+-------+
|      5|2025-01-07|        Sneha|   Laptop|53000.0|
|     17|2025-01-19|       Lokesh|   Laptop|60000.0|
|      7|2025-01-09|        Rahul|Earphones| 1200.0|
|      3|2025-01-05|         John| Keyboard| 1500.0|
+-------+----------+-------------+---------+-------+
only showing top 4 rows

+-------+----------+-------------+--------+-------+
|sale_id|      date|customer_name| product| amount|
+-------+----------+-------------+--------+-------+
|     20|2025-01-22|      Sathish| Printer| 9500.0|
|     11|2025-01-13|        Divya|  Laptop|59000.0|
|      2|2025-01-04|        Meena|   Mouse|  800.0|
|     13|2025-01-15|        Suman|Keyboard| 1600.0|
+-------+----------+-------------+--------+-------+
only showing top 4 rows



#### union or unionByName()

In [199]:
data1 = [
        {"sale_id": 1, "date": "2025-01-03", "customer": "Arun", "product": "Laptop", "amount": 56000}, 
        {"sale_id": 2, "date": "2025-01-04", "customer": "Meena", "product": "Mouse", "amount": 800}
        ]

data2 =[
    {"sale_id": 3, "date": "2025-01-05", "customer": "John", "product": "Keyboard", "amount": 1500}, 
    {"sale_id": 4, "date": "2025-01-06", "customer": "Priya", "product": "Monitor", "amount": 7200}, 
    {"sale_id": 5, "date": "2025-01-07", "customer": "Sneha", "product": "Laptop", "amount": 53000}
]

df_data1 = spark.createDataFrame(data=data1)
df_data2 = spark.createDataFrame(data=data2)

df= df_data1.union(df_data2)
df.show()

df= df_data1.unionByName(df_data2)
df.show()

+------+--------+----------+--------+-------+
|amount|customer|      date| product|sale_id|
+------+--------+----------+--------+-------+
| 56000|    Arun|2025-01-03|  Laptop|      1|
|   800|   Meena|2025-01-04|   Mouse|      2|
|  1500|    John|2025-01-05|Keyboard|      3|
|  7200|   Priya|2025-01-06| Monitor|      4|
| 53000|   Sneha|2025-01-07|  Laptop|      5|
+------+--------+----------+--------+-------+

+------+--------+----------+--------+-------+
|amount|customer|      date| product|sale_id|
+------+--------+----------+--------+-------+
| 56000|    Arun|2025-01-03|  Laptop|      1|
|   800|   Meena|2025-01-04|   Mouse|      2|
|  1500|    John|2025-01-05|Keyboard|      3|
|  7200|   Priya|2025-01-06| Monitor|      4|
| 53000|   Sneha|2025-01-07|  Laptop|      5|
+------+--------+----------+--------+-------+



#### Srting function

In [200]:
#concat
df1 = df1.withColumn("full_name",concat_ws(" ",col("customer_name"),lit("   AAA   ")))
# df1.show(4)

#trim
df1 = df1.withColumn("trim_name",trim(col("full_name")))

#ltrim
df1 = df1.withColumn("ltrim_name",ltrim(col("full_name")))

#rtrim
df1 = df1.withColumn("rtrim_name",rtrim(col("full_name")))
df1.show(4)
df1 = df1.drop('full_name','trim_name','ltrim_name','rtrim_name')

+-------+----------+-------------+--------+-------+---------------+------------+---------------+------------+
|sale_id|      date|customer_name| product| amount|      full_name|   trim_name|     ltrim_name|  rtrim_name|
+-------+----------+-------------+--------+-------+---------------+------------+---------------+------------+
|      1|2025-01-03|         Arun|  Laptop|56000.0| Arun    AAA   | Arun    AAA| Arun    AAA   | Arun    AAA|
|      2|2025-01-04|        Meena|   Mouse|  800.0|Meena    AAA   |Meena    AAA|Meena    AAA   |Meena    AAA|
|      3|2025-01-05|         John|Keyboard| 1500.0| John    AAA   | John    AAA| John    AAA   | John    AAA|
|      4|2025-01-06|        Priya| Monitor| 7200.0|Priya    AAA   |Priya    AAA|Priya    AAA   |Priya    AAA|
+-------+----------+-------------+--------+-------+---------------+------------+---------------+------------+
only showing top 4 rows



#### Date fun

In [201]:
#current_date
df1 = df1.withColumn("current_date",current_date())

#date_add
df1 = df1.withColumn("after_7_days",date_add("current_date",7))

#date_sub
df1 =df1.withColumn("before_7_days",date_sub("current_date",7))

#date_diff
df1 = df1.withColumn("date_diff",date_diff("after_7_days","current_date"))

#last_day
df1 = df1.withColumn("last_day",last_day("current_date"))

#date_format
df1 = df1.withColumn("changed_date",date_format(col('date'),'dd-MM-yyyy'))

df1.select("date","changed_date","current_date","after_7_days","before_7_days","date_diff","last_day",).show(4)
df1 = df1.drop("changed_date","current_date","after_7_days","before_7_days","date_diff","last_day")

+----------+------------+------------+------------+-------------+---------+----------+
|      date|changed_date|current_date|after_7_days|before_7_days|date_diff|  last_day|
+----------+------------+------------+------------+-------------+---------+----------+
|2025-01-03|  03-01-2025|  2025-12-01|  2025-12-08|   2025-11-24|        7|2025-12-31|
|2025-01-04|  04-01-2025|  2025-12-01|  2025-12-08|   2025-11-24|        7|2025-12-31|
|2025-01-05|  05-01-2025|  2025-12-01|  2025-12-08|   2025-11-24|        7|2025-12-31|
|2025-01-06|  06-01-2025|  2025-12-01|  2025-12-08|   2025-11-24|        7|2025-12-31|
+----------+------------+------------+------------+-------------+---------+----------+
only showing top 4 rows



#### handling null

In [202]:
df1.dropna('any')
df1.dropna('all')
df1.dropna(subset=["customer_name"])
df1.fillna("not available")
df1.fillna("Not",subset=["customer_name"])

DataFrame[sale_id: int, date: date, customer_name: string, product: string, amount: double]

In [203]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("file_read").getOrCreate()
data_f = spark.read.format('csv')\
                .option("inferSchema",True)\
                .option('header',True)\
                .load("C:/Git files/My git files/PySpark/files/friday_sale.csv")
data_f = data_f.withColumn("Full_name",concat_ws(" ",col("first_name"),col("last_name")))


#### split, indexing

In [204]:
data_f = data_f.withColumn("split_name",split(col("Full_name")," "))
data_f = data_f.withColumn("f_name",split(col("Full_name")," ")[0])\
               .withColumn("l_name",split(col("Full_name")," ")[1])
data_f.select("split_name","f_name","l_name").show(4)
data_f = data_f.drop("f_name","l_name")

+---------------+------+------+
|     split_name|f_name|l_name|
+---------------+------+------+
|  [Arun, Kumar]|  Arun| Kumar|
|[Meena, Sharma]| Meena|Sharma|
| [John, Mathew]|  John|Mathew|
| [Priya, Reddy]| Priya| Reddy|
+---------------+------+------+
only showing top 4 rows



#### Exploade

In [205]:
data_f = data_f.withColumn("explode_data",explode(col("split_name")))
data_f.show(6)

+-------+----------+----------+---------+--------+------+------------+---------------+------------+
|sale_id|      date|first_name|last_name| product|amount|   Full_name|     split_name|explode_data|
+-------+----------+----------+---------+--------+------+------------+---------------+------------+
|      1|2025-01-03|      Arun|    Kumar|  Laptop| 56000|  Arun Kumar|  [Arun, Kumar]|        Arun|
|      1|2025-01-03|      Arun|    Kumar|  Laptop| 56000|  Arun Kumar|  [Arun, Kumar]|       Kumar|
|      2|2025-01-04|     Meena|   Sharma|   Mouse|   800|Meena Sharma|[Meena, Sharma]|       Meena|
|      2|2025-01-04|     Meena|   Sharma|   Mouse|   800|Meena Sharma|[Meena, Sharma]|      Sharma|
|      3|2025-01-05|      John|   Mathew|Keyboard|  1500| John Mathew| [John, Mathew]|        John|
|      3|2025-01-05|      John|   Mathew|Keyboard|  1500| John Mathew| [John, Mathew]|      Mathew|
+-------+----------+----------+---------+--------+------+------------+---------------+------------+


#### Array Contains

In [206]:
data_f = data_f.withColumn("array_contains",array_contains(col("split_name"),"Kumar"))
data_f.show(4)
data_f = data_f.drop("array_contains","explode_data","split_name")
data_f.printSchema()

+-------+----------+----------+---------+-------+------+------------+---------------+------------+--------------+
|sale_id|      date|first_name|last_name|product|amount|   Full_name|     split_name|explode_data|array_contains|
+-------+----------+----------+---------+-------+------+------------+---------------+------------+--------------+
|      1|2025-01-03|      Arun|    Kumar| Laptop| 56000|  Arun Kumar|  [Arun, Kumar]|        Arun|          true|
|      1|2025-01-03|      Arun|    Kumar| Laptop| 56000|  Arun Kumar|  [Arun, Kumar]|       Kumar|          true|
|      2|2025-01-04|     Meena|   Sharma|  Mouse|   800|Meena Sharma|[Meena, Sharma]|       Meena|         false|
|      2|2025-01-04|     Meena|   Sharma|  Mouse|   800|Meena Sharma|[Meena, Sharma]|      Sharma|         false|
+-------+----------+----------+---------+-------+------+------------+---------------+------------+--------------+
only showing top 4 rows

root
 |-- sale_id: integer (nullable = true)
 |-- date: date (n

#### Group By

In [207]:
data_f.groupBy("product").agg(max("amount").alias("maximun_amount")\
                             ,min("amount").alias("minmun_amount")\
                             ,sum("amount").alias("sum_amount")).show()

+---------+--------------+-------------+----------+
|  product|maximun_amount|minmun_amount|sum_amount|
+---------+--------------+-------------+----------+
|Earphones|          1200|         1100|      4600|
|  Speaker|          2600|         2500|     10200|
|   Webcam|          3000|         2800|     11600|
|   Laptop|         60000|        53000|    456000|
|    Mouse|           850|          800|      3300|
|   Tablet|         18000|        17500|     71000|
|  Printer|          9500|         9000|     37000|
| Keyboard|          1600|         1500|      6200|
|  Monitor|          7200|         7100|     28600|
+---------+--------------+-------------+----------+

