<a href="https://colab.research.google.com/github/RithvikPanchumarthi/DataAnalytics/blob/main/DE_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# SparkApp creation

SparkContext Creation prior to Pyspark2.0

In [None]:
# from pyspark import SparkConf, SparkContext
# conf = SparkConf()
# conf.setMaster("local").setAppName("SparkAppName")
# sc = SparkContext.getOrCreate(conf)
# print(sc.appName)

In [166]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import *

# from google.colab import drive # Uncomment if you're using Google Colab and need to mount your drive

# drive.mount('/content/drive') # Uncomment and run if you're using Google Colab

# Create a SparkSession
# If one already exists, Spark will use it; otherwise, it creates a new one.
spark = SparkSession.builder.appName("SparkApplication").getOrCreate()

# Now you can access the SparkContext if needed via spark.sparkContext
sc = spark.sparkContext

In [151]:

# SparkContext stop() method
#spark.sparkContext.stop()


# RDD

PySpark RDD – Resilient Distributed Dataset

This is a fundamental data structure that is fault-tolerant, immutable, and distributed collections of objects. RDDs are immutable, meaning they cannot be changed once created. Any transformation on an RDD results in a new RDD. Each dataset in RDD is divided into logical partitions, which can be computed on different nodes of the cluster.

In [165]:
# 2. Specify the path to your CSV file
# Replace 'csv_file.csv' with the actual path to your file
# Example path if in Google Drive: '/content/drive/My Drive/csv_file.csv'
# Example path if uploaded directly to Colab: '/content/csv_file.csv'
csv_file_path = '/content/sample_data/lowes_home_appliances.csv'

csv_rdd = sc.textFile(csv_file_path)
# 3. Load the CSV file into an RDD


# You can now work with the csv_rdd


We can perform two types of RDD Operations



1.   Transformations
2.   Actions

RDD transformations in PySpark are lazy operations and they execute only when an action is called on RDD.

Transformation operations are


*   map, filter, flatMap, groupByKey, reduceByKey, join, union, sortByKey, distinct, sample, mapPartitions, and aggregateByKey.


These functions transform RDDs by applying computations in a distributed manner across a cluster of machines and return a new RDD

RDD actions in PySpark trigger computations and return results to the Spark driver.



*   collect, count, take, reduce, foreach, first, takeOrdered, takeSample, countByKey, saveAsTextFile, saveAsSequenceFile, saveAsObjectFile, foreachPartition, collectAsMap, aggregate, and fold.



Note: These actions initiate execution and materialize RDD data. Remember any RDD operation that returns non RDD is considered as an action.

In [46]:
#display 10 rows

csv_rdd.take(10)

['CATEGORY,DATE_SCRAPED,SORT_BY,RUN_START_DATE,SUBCATEGORY,SHIPPING_LOCATION,SKU,COUNTRY,BRAND,PRICE_RETAIL,PRICE_CURRENT,SELLER,PRODUCT_URL,CURRENCY,BREADCRUMBS,DEPARTMENT,PROMOTION,BESTSELLER_RANK,PRODUCT_NAME,WEBSITE_URL',
 'Refrigerators,2022-04-18T16:54:01+8:00,Best Sellers,2022-04-18T16:45:50+8:00,Bottom-Freezer Refrigerators,06854,5001252387,USA,Hisense,999.00,999.00,LOWES,https://www.lowes.com/pd/Hisense-17-2-cu-ft-Bottom-Freezer-Refrigerator-Stainless-steel-ENERGY-STAR/5001252387,USD,Appliances>Refrigerators>Bottom-Freezer Refrigerators,Appliances,,1,17.2-cu ft Counter-depth Bottom-Freezer Refrigerator (Fingerprint Resistant Stainless Steel) ENERGY STAR,http://www.lowes.com/',
 'Refrigerators,2022-04-18T16:54:01+8:00,Best Sellers,2022-04-18T16:45:50+8:00,Bottom-Freezer Refrigerators,06854,1002543648,USA,LG,1799.00,1599.00,LOWES,https://www.lowes.com/pd/LG-25-5-cu-ft-Bottom-Freezer-Refrigerator-with-Ice-Maker-Fingerprint-Resistant-Printproof-Stainless-Steel-ENERGY-STAR/10025436

# DataFrame

PySpark DataFrame

DataFrame is a distributed dataset comprising data arranged in rows and columns with named attributes. Sharing similarities with relational database tables or Python data frames but incorporates sophisticated optimizations.

In [154]:
# Create DataFrame
data = [('John','','Cena','1991-01-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","DOB","gender","salary"]
list_df = spark.createDataFrame(data=data, schema = columns)

In [149]:
list_df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [49]:
list_df.show(10)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       DOB|gender|salary|
+---------+----------+--------+----------+------+------+
|     John|          |    Cena|1991-01-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



Supported file formats

Apache Spark,supports a rich set of APIs to read and write several file formats.

Text Files (.txt)
CSV Files (.csv)
TSV Files (.tsv)
Avro Files (.avro)
JSON Files (.json)
Parquet (.parquet)
ORC Files (.orc)
XML Files and many other formats

Creating DataFrame from the Files

In [50]:
# Create DataFrame from CSV file
csv_df = spark.read.csv("/content/sample_data/lowes_home_appliances.csv", header=True, inferSchema=True)

# Nested Column

In [51]:
data_to_nest = [(('James','','Smith'),'1991-04-01','M',3000),
  (('Michael','Rose',''),'2000-05-19','M',4000),
  (('Robert','','Williams'),'1978-09-05','M',4000),
  (('Maria','Anne','Jones'),'1967-12-01','F',4000),
  (('Jen','Mary','Brown'),'1980-02-17','F',-1)
]

In [52]:
schema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('dob', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

In [53]:
nested_df = spark.createDataFrame(data = data_to_nest, schema = schema)
nested_df.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



# Column Operations
  withColumnRenamed()

PySpark withColumnRenamed – To rename column or multiple columns

In [74]:
nested_df.withColumnRenamed("dob","DateOfBirth").printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [75]:
multiple_column_renamed_df = nested_df.withColumnRenamed("dob","DateOfBirth") \
    .withColumnRenamed("salary","salary_amount")
multiple_column_renamed_df.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary_amount: integer (nullable = true)



In [76]:
schema2 = StructType([
    StructField("fname",StringType()),
    StructField("midname",StringType()),
    StructField("lname",StringType())])

In [77]:
nested_df.select(col("name").cast(schema2), \
     col("dob"), col("gender"),col("salary")) \
   .printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- fname: string (nullable = true)
 |    |-- midname: string (nullable = true)
 |    |-- lname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



Transposing the structure to flat.

In [85]:
nested_df.select(col("name.firstname").alias("fname"), \
  col("name.middlename").alias("mname"), \
  col("name.lastname").alias("lname"), \
  col("dob"),col("gender"),col("salary")) \
  .printSchema()

root
 |-- fname: string (nullable = true)
 |-- mname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [86]:
# Define the new column names
newColumns = ["firstname", "middlename", "lastname", "DOB", "Gender", "Salary"]
# Use the previously created df2 DataFrame
renamed_df = df2.toDF(*newColumns)

# Print the schema to verify
renamed_df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: integer (nullable = true)
 |-- DOB: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Salary: string (nullable = true)



In [87]:
df2 = nested_df.withColumn("fname",col("name.firstname")) \
      .withColumn("mname",col("name.middlename")) \
      .withColumn("lname",col("name.lastname")) \
      .drop("name")
df2.printSchema()

root
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- mname: string (nullable = true)
 |-- lname: string (nullable = true)



#withColumn()

In [118]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',1000)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
column_df = spark.createDataFrame(data=data, schema = columns)

Update Column Datatype

In [119]:
column_df.withColumn("salary",col("salary").cast("Integer")).show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|  1000|
+---------+----------+--------+----------+------+------+



Update Column value

In [123]:
column_df.withColumn("salary_inr",col("salary")*85).show()

+---------+----------+--------+----------+------+------+----------+
|firstname|middlename|lastname|       dob|gender|salary|salary_inr|
+---------+----------+--------+----------+------+------+----------+
|    James|          |   Smith|1991-04-01|     M|  3000|    255000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|    340000|
|   Robert|          |Williams|1978-09-05|     M|  4000|    340000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|    340000|
|      Jen|      Mary|   Brown|1980-02-17|     F|  1000|     85000|
+---------+----------+--------+----------+------+------+----------+



Create a Column from an Existing

In [134]:
column_df.withColumn("Salary_AfterTax",col("salary")* 0.85).show()

+---------+----------+--------+----------+------+------+---------------+
|firstname|middlename|lastname|       dob|gender|salary|Salary_AfterTax|
+---------+----------+--------+----------+------+------+---------------+
|    James|          |   Smith|1991-04-01|     M|  3000|         2550.0|
|  Michael|      Rose|        |2000-05-19|     M|  4000|         3400.0|
|   Robert|          |Williams|1978-09-05|     M|  4000|         3400.0|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|         3400.0|
|      Jen|      Mary|   Brown|1980-02-17|     F|  1000|          850.0|
+---------+----------+--------+----------+------+------+---------------+



Adding New Column with Constant value

In [138]:
# df.withColumn("Client_Country", lit("USA")).show()
column_df.withColumn("Client_Country", lit("USA")) \
  .withColumn("Country",lit("India")).show()

+---------+----------+--------+----------+------+------+--------------+-------+
|firstname|middlename|lastname|       dob|gender|salary|Client_Country|Country|
+---------+----------+--------+----------+------+------+--------------+-------+
|    James|          |   Smith|1991-04-01|     M|  3000|           USA|  India|
|  Michael|      Rose|        |2000-05-19|     M|  4000|           USA|  India|
|   Robert|          |Williams|1978-09-05|     M|  4000|           USA|  India|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|           USA|  India|
|      Jen|      Mary|   Brown|1980-02-17|     F|  1000|           USA|  India|
+---------+----------+--------+----------+------+------+--------------+-------+



drop()

In [144]:
salary_drop_column_df = column_df.drop("salary")
salary_drop_column_df.show()

+---------+----------+--------+----------+------+
|firstname|middlename|lastname|       dob|gender|
+---------+----------+--------+----------+------+
|    James|          |   Smith|1991-04-01|     M|
|  Michael|      Rose|        |2000-05-19|     M|
|   Robert|          |Williams|1978-09-05|     M|
|    Maria|      Anne|   Jones|1967-12-01|     F|
|      Jen|      Mary|   Brown|1980-02-17|     F|
+---------+----------+--------+----------+------+

