<a href="https://colab.research.google.com/github/AnujGogate/DE/blob/main/Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Spark Boiler Plate Code

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, BooleanType, FloatType, MapType, ArrayType, DateType, LongType
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql import *
spark

##Spark Essentials

###Word Count

In [None]:
text = "Hello, World How are you. I'm fine, but tell me how are you too. HeLLo,"

In [None]:
rdd_1 = spark.sparkContext.parallelize(text).flatMap(lambda x : x.lower().split(" "),1)

rdd_2 = rdd_1.map(lambda x : (x,1))

rdd_3 = rdd_2.reduceByKey(lambda x,y : x+y).sortByKey().collect()

print(rdd_3)

[('', 28), ("'", 1), (',', 3), ('.', 2), ('a', 2), ('b', 1), ('d', 1), ('e', 7), ('f', 1), ('h', 4), ('i', 2), ('l', 7), ('m', 2), ('n', 1), ('o', 9), ('r', 3), ('t', 3), ('u', 3), ('w', 3), ('y', 2)]


###Handling NULL's

In [None]:
data = [(1, 'Anuj', None), (2, 'Rahul', 102), (3, 'John', 101)]

df = spark.createDataFrame(data, schema = ('id', 'name', 'dept'))

In [None]:
from pyspark.sql.functions import col, coalesce, avg

#df1 = df.fillna("unknown",subset=["dept"])

df2 = df.withColumn("dept",when(col("dept").isNull(),"Unknown").otherwise(col("dept")))

df3 = df.withColumn("dept",coalesce(col("dept"),lit("Unknown")))

df3.show()

df.select(df.name, isnull(df.dept).alias('isNull?'))

+---+-----+-------+
| id| name|   dept|
+---+-----+-------+
|  1| Anuj|Unknown|
|  2|Rahul|    102|
|  3| John|    101|
+---+-----+-------+



name,isNull?
Anuj,True
Rahul,False
John,False


###Dropping NULL's

In [None]:
data = [('Alice', 80, 10), ('Bob', None, 5), ('Tom', 50, 50), (None, None, None), ('Robert', 30, 35)] ; schema = 'name string, age int, height int'

df = spark.createDataFrame(data, schema = schema)

In [None]:
avg_age = df1.select(floor(avg("age"))).collect()[0][0]

df1 = df.fillna(avg_age,subset = ["age"])

df2 = df1.dropna()

df2.show()

+------+---+------+
|  name|age|height|
+------+---+------+
| Alice| 80|    10|
|   Bob| 53|     5|
|   Tom| 50|    50|
|Robert| 30|    35|
+------+---+------+



###Create Dataframe with range 100 records and filter records from 40 to 80

In [None]:
df = spark.createDataFrame([(x,) for x in range(1,11)], schema = "values int")

df.show()

+------+
|values|
+------+
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
+------+



In [None]:
df1 = df.filter((col("values") >= 4) & (col("values") <= 8))

df2 = df.where(col("values").between(4,8))

df2.show()

+------+
|values|
+------+
|     4|
|     5|
|     6|
|     7|
|     8|
+------+



###Merge Two Array Dataframe



In [None]:
data_1 = [(1, 'Anuj', 103), (2, 'William', 102), (3, 'John', 101)]

data_2 = [(4, 'Sam', 105), (5, 'Rocky', 106), (6, 'Steve', 107)]

df_1 = spark.createDataFrame(data_1, schema = ('id', 'name', 'dept'))

df_2 = spark.createDataFrame(data_2, schema = ('id', 'name', 'dept'))

In [None]:
df1 = df_1.withColumn("location",lit(None))

df2 = df1.unionByName(df_2,allowMissingColumns=True)

df2.show()

+---+-------+----+--------+
| id|   name|dept|location|
+---+-------+----+--------+
|  1|   Anuj| 103|    null|
|  2|William| 102|    null|
|  3|   John| 101|    null|
|  4|    Sam| 105|    null|
|  5|  Rocky| 106|    null|
|  6|  Steve| 107|    null|
+---+-------+----+--------+



In [None]:
data = [("A", ["Anuj", "John"], ["Gogate", "Wick"]), ("B", ["Bruce", "Clark"], ["Wayne", "Kent"])]
columns = ["symbol", "firstname", "lastname"]

# Create a DataFrame from the input data
df = spark.createDataFrame(data, columns)

df.show()

+------+--------------+--------------+
|symbol|     firstname|      lastname|
+------+--------------+--------------+
|     A|  [Anuj, John]|[Gogate, Wick]|
|     B|[Bruce, Clark]| [Wayne, Kent]|
+------+--------------+--------------+



In [None]:
df1 = df.select(df.symbol,df.firstname,df.lastname,explode(arrays_zip(df.firstname,df.lastname)).alias("full_name"))

df1.show()

+------+--------------+--------------+--------------+
|symbol|     firstname|      lastname|     full_name|
+------+--------------+--------------+--------------+
|     A|  [Anuj, John]|[Gogate, Wick]|{Anuj, Gogate}|
|     A|  [Anuj, John]|[Gogate, Wick]|  {John, Wick}|
|     B|[Bruce, Clark]| [Wayne, Kent]|{Bruce, Wayne}|
|     B|[Bruce, Clark]| [Wayne, Kent]| {Clark, Kent}|
+------+--------------+--------------+--------------+



###Extract Name from email id

In [None]:
info = [{"empId" : "AG255120", "email": "anuj.gogate@teradata.com"},
        {"empId" : "FS250040", "email": "john.bravo@teradata.com"}]

df = spark.createDataFrame(info)

df.show(truncate = False)

+------------------------+--------+
|email                   |empId   |
+------------------------+--------+
|anuj.gogate@teradata.com|AG255120|
|john.bravo@teradata.com |FS250040|
+------------------------+--------+



In [None]:
from pyspark.sql.functions import split, col, initcap, concat_ws

# Split the email column by "@" and take the first part
df1 = df.withColumn("name_split", split(col("email"), "@")[0])

# Split the first part by "." to separate the first and last names
df2 = df1.withColumn("name_parts", split(col("name_split"), "\\."))

# Capitalize the first and last names, and concatenate them with a space in between
df3 = df2.withColumn("Full_name", concat_ws(" ", initcap(col("name_parts")[0]), initcap(col("name_parts")[1])))

# Select only the required columns
final_df = df3.select("email", "empId", "Full_name")

df2.show(truncate=False)


+------------------------+--------+-----------+--------------+
|email                   |empId   |name_split |name_parts    |
+------------------------+--------+-----------+--------------+
|anuj.gogate@teradata.com|AG255120|anuj.gogate|[anuj, gogate]|
|john.bravo@teradata.com |FS250040|john.bravo |[john, bravo] |
+------------------------+--------+-----------+--------------+



In [None]:
@udf (returnType = StringType())
def get_names(p_str):
  str1 = p_str.split("@")[0].split(".")
  return str1[0][0].upper() + str1[0][1:] + " " + str1[1][0].upper() + str1[1][1:]

df1 = df.select("*",get_names("email").alias("Full_name")).show()

+--------------------+--------+-----------+
|               email|   empId|  Full_name|
+--------------------+--------+-----------+
|anuj.gogate@terad...|AG255120|Anuj Gogate|
|john.bravo@terada...|FS250040| John Bravo|
+--------------------+--------+-----------+



In [None]:
df1 = df.withColumn("full_name", \
      concat_ws(" ", \
      (initcap(split(col("email"),"[@.]")[0])), \
      (initcap(split(col("email"),"[@.]")[1])) \
      ))

df1.show(truncate = False)

+------------------------+--------+-----------+
|email                   |empId   |full_name  |
+------------------------+--------+-----------+
|anuj.gogate@teradata.com|AG255120|Anuj Gogate|
|john.bravo@teradata.com |FS250040|John Bravo |
+------------------------+--------+-----------+



###Renaming columns

In [None]:
data = [('a', 100, 'HR'), ('b', 200, 'Manager'), ('c', 300, 'Manager'), ('d', 400, 'HR'), ('e', 500, 'HR'), ('f', 600, 'Manager')]

schema = ["name", "salary", "dept"]

df = spark.createDataFrame(data, schema = schema)

df.orderBy('dept')

name,salary,dept
e,500,HR
a,100,HR
d,400,HR
f,600,Manager
b,200,Manager
c,300,Manager


In [None]:
new_col = "new_"

df1 = df.select([df[x].alias(f"{new_col}{x}")  for x in df.columns])

df1

new_name,new_salary,new_dept
a,100,HR
b,200,Manager
c,300,Manager
d,400,HR
e,500,HR
f,600,Manager


###Spliting based on Pipe

In [None]:
data = [
    (1, "a|b"),
    (2, "a|e|d"),
    (3, "f")
]

schema = ["c1", "c2"]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
df.show(truncate=False)

+---+-----+
|c1 |c2   |
+---+-----+
|1  |a|b  |
|2  |a|e|d|
|3  |f    |
+---+-----+



In [None]:
df2 = df.withColumn("c2",explode(split(col("c2"),"\|"))).show()

"""
Key Difference:
"|": Treated as a regular expression (logical OR operator).
"\\|": Escaped pipe, treated as a literal character.

Best Practice:
When working with characters that have special meanings in regular expressions, such as |, ., *, +, etc., always escape them using a backslash (\) if you want to treat them literally.
"""

+---+---+
| c1| c2|
+---+---+
|  1|  a|
|  1|  b|
|  2|  a|
|  2|  e|
|  2|  d|
|  3|  f|
+---+---+



'\nKey Difference:\n"|": Treated as a regular expression (logical OR operator).\n"\\|": Escaped pipe, treated as a literal character.\n\nBest Practice:\nWhen working with characters that have special meanings in regular expressions, such as |, ., *, +, etc., always escape them using a backslash (\\) if you want to treat them literally.\n'

###Filter based on comments in a column

In [None]:
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("date_searched", StringType(), True),
    StructField("filter_room_types", StringType(), True)
])
# Data
data = [
    (1, "2022-01-01", "entire home,couple room,private room"),
    (2, "2022-01-02", "entire home,shared room"),
    (3, "2022-01-02", "private room"),
    (4, "2022-01-03", "private room"),
    (5, "2022-01-04", "entire home,private room,shared room,couple room"),
    (6, "2022-01-05", "entire home,shared room"),
    (7, "2022-01-06", "private room,couple room,private room"),
    (8, "2022-01-07", "entire home,shared room"),
    (9, "2022-01-08", "private room,shared room"),
    (10, "2022-01-09", "entire home")
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)

# Show DataFrame
df.show(truncate=False)

+-------+-------------+------------------------------------------------+
|user_id|date_searched|filter_room_types                               |
+-------+-------------+------------------------------------------------+
|1      |2022-01-01   |entire home,couple room,private room            |
|2      |2022-01-02   |entire home,shared room                         |
|3      |2022-01-02   |private room                                    |
|4      |2022-01-03   |private room                                    |
|5      |2022-01-04   |entire home,private room,shared room,couple room|
|6      |2022-01-05   |entire home,shared room                         |
|7      |2022-01-06   |private room,couple room,private room           |
|8      |2022-01-07   |entire home,shared room                         |
|9      |2022-01-08   |private room,shared room                        |
|10     |2022-01-09   |entire home                                     |
+-------+-------------+----------------------------

In [None]:
df1 = df.withColumn("room_types",explode(split("filter_room_types",","))) \
        .filter(col("room_types").like("%room"))

df1.show()

+-------+-------------+--------------------+------------+
|user_id|date_searched|   filter_room_types|  room_types|
+-------+-------------+--------------------+------------+
|      1|   2022-01-01|entire home,coupl...| couple room|
|      1|   2022-01-01|entire home,coupl...|private room|
|      2|   2022-01-02|entire home,share...| shared room|
|      3|   2022-01-02|        private room|private room|
|      4|   2022-01-03|        private room|private room|
|      5|   2022-01-04|entire home,priva...|private room|
|      5|   2022-01-04|entire home,priva...| shared room|
|      5|   2022-01-04|entire home,priva...| couple room|
|      6|   2022-01-05|entire home,share...| shared room|
|      7|   2022-01-06|private room,coup...|private room|
|      7|   2022-01-06|private room,coup...| couple room|
|      7|   2022-01-06|private room,coup...|private room|
|      8|   2022-01-07|entire home,share...| shared room|
|      9|   2022-01-08|private room,shar...|private room|
|      9|   20

###Finding Average Salary

In [38]:
# add a flag which indicates if the salary of employee is above or below average
data = [('a', 100, 'HR'), ('b', 200, 'Manager'), ('c', 300, 'Manager'), ('d', 400, 'HR'), ('e', 500, 'HR'), ('f', 600, 'Manager')]

schema = ["name", "salary", "dept"]

df = spark.createDataFrame(data, schema = schema)

df.orderBy('dept')

name,salary,dept
a,100,HR
e,500,HR
d,400,HR
b,200,Manager
f,600,Manager
c,300,Manager


In [41]:
avg_salary = df.agg(floor(avg("salary"))).collect()[0][0]

df1 = df.withColumn("salary_chk",when(col("salary") > avg_salary,"Salary is greater than average").otherwise("Salary is under Average")).show(7,False)

+----+------+-------+------------------------------+
|name|salary|dept   |salary_chk                    |
+----+------+-------+------------------------------+
|a   |100   |HR     |Salary is under Average       |
|b   |200   |Manager|Salary is under Average       |
|c   |300   |Manager|Salary is under Average       |
|d   |400   |HR     |Salary is greater than average|
|e   |500   |HR     |Salary is greater than average|
|f   |600   |Manager|Salary is greater than average|
+----+------+-------+------------------------------+



##Tiger Analytics

###Count of each Join

In [None]:
# Define data for Table A and Table B
data_A = [(1,), (0,), (0,), (None,), (1,), (None,), (0,)]
data_B = [(1,), (0,), (1,), (None,), (1,), (0,)]

# Define schemas for Table A and Table B
columns_A = ["A"]
columns_B = ["B"]

# Create DataFrames for Table A and Table B
df_A = spark.createDataFrame(data_A, columns_A)
df_B = spark.createDataFrame(data_B, columns_B)

+-------+------+-----------+
|cust_id|origin|destination|
+-------+------+-----------+
|      1| Delhi|  Mangalore|
|      2|Mumbai|    Chennai|
+-------+------+-----------+



In [None]:
# Perform the various joins and unions

# 1. Inner Join
inner_join_df = df_A.join(df_B, df_A.A == df_B.B, "inner")
print("Inner Join Result:")
inner_join_df.show()

# 2. Left Join
left_join_df = df_A.join(df_B, df_A.A == df_B.B, "left")
print("Left Join Result:")
left_join_df.show()

# 3. Right Join
right_join_df = df_A.join(df_B, df_A.A == df_B.B, "right")
print("Right Join Result:")
right_join_df.show()

# 4. Full Outer Join
outer_join_df = df_A.join(df_B, df_A.A == df_B.B, "outer")
print("Full Outer Join Result:")
outer_join_df.show()

# 5. Union (removing duplicates)
union_df = df_A.union(df_B).distinct()
print("Union Result:")
union_df.show()

# 6. Union All (including duplicates)
union_all_df = df_A.union(df_B)
print("Union All Result:")
union_all_df.show()


###finding the first origin and final destination for each customer based on a sequence of flights taken.

In [None]:
flights_data = [(1,'Flight1' , 'Delhi' , 'Hyderabad'),
 (1,'Flight2' , 'Hyderabad' , 'Kochi'),
 (1,'Flight3' , 'Kochi' , 'Mangalore'),
 (2,'Flight1' , 'Mumbai' , 'Ayodhya'),
 (2,'Flight2' , 'Ayodhya' , 'Chennai')
 ]

my_schema = "cust_id int, flight_id string , origin string , destination string"

df = spark.createDataFrame(data = flights_data, schema = my_schema)

+-------+------+-----------+
|cust_id|origin|destination|
+-------+------+-----------+
|      1| Delhi|  Mangalore|
|      2|Mumbai|    Chennai|
+-------+------+-----------+



In [None]:
#spec = Window.partitionBy("cust_id").orderBy("flight_id")

df1 = df.groupBy("cust_id").agg(first("origin").alias("origin"),last("destination").alias("destination"))

df1.show()

###Calculate the TotalQty, TotalSales, and MoMProfit for each mont

In [None]:
# Initialize Spark session
spark = SparkSession.builder.appName("SalesData").getOrCreate()

# Define the raw data
data = [
    ("Ramesh", 20, "Jan", 50000),
    ("Deep", 25, "Jan", 30000),
    ("Suresh", 22, "Feb", 50000),
    ("Ram", 28, "Feb", 20000),
    ("Pradeep", 22, "Feb", 20000),
    ("Deep", 25, "Mar", 30000),
    ("Suresh", 22, "Mar", 50000),
    ("Ram", 28, "Mar", 20000),
    ("Pradeep", 22, "Mar", 20000)
]

# Define column names
columns = ["Name", "Qty", "Month", "Sales"]

# Create DataFrame
df = spark.createDataFrame(data, schema=columns)

In [None]:
# Step 1: Aggregate data by Month to get TotalQty and TotalSales
monthly_df = df.groupBy("Month") \
               .agg(
                   sum("Qty").alias("TotalQty"),
                   sum("Sales").alias("TotalSales")
               )

# Step 2: Define a window to calculate the previous month's sales for MoMProfit calculation
windowSpec = Window.orderBy("Month")

# Step 3: Calculate MoMProfit
result_df = monthly_df.withColumn("PrevMonthSales", lag("TotalSales").over(windowSpec)) \
                      .withColumn("MoMProfit", col("TotalSales") - col("PrevMonthSales")) \
                      .select("Month", "TotalQty", "TotalSales", "MoMProfit")

# Show the result
result_df.show()

+-----+--------+----------+--------------+
|Month|TotalQty|TotalSales|PrevMonthSales|
+-----+--------+----------+--------------+
|  Feb|      72|     90000|          null|
|  Jan|      45|     80000|         90000|
|  Mar|      97|    120000|         80000|
+-----+--------+----------+--------------+

