<a href="https://colab.research.google.com/github/Mech123/Data-engineering/blob/main/Pyspark_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
# Install Java
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Download and extract Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
!tar xf spark-3.5.5-bin-hadoop3.tgz
!mv spark-3.5.5-bin-hadoop3 spark

# Install findspark
!pip install -q findspark


In [5]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark"


In [6]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Colab PySpark") \
    .getOrCreate()

spark


In [8]:
# Create DataFrame
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)


In [9]:
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [10]:
 df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [11]:
# Read CSV File from the "sample" folder
df = spark.read.csv("/content/sample_data/zipcodes.csv", header=True, inferSchema=True)

# Print the schema
df.printSchema()


root
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Xaxis: double (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- TaxReturnsFiled: integer (nullable = true)
 |-- EstimatedPopulation: integer (nullable = true)
 |-- TotalWages: integer (nullable = true)
 |-- Notes: string (nullable = true)



# Read multiple CSV files
df = spark.read.csv("path/file1.csv,path/file2.csv,path/file3.csv")


# Read all files from a directory
df = spark.read.csv("Folder path")


# Using delimiter option
df3 = spark.read.options(delimiter=',') \
  .csv("/path/zipcodes.csv")

# Using inferschema and delimiter
df4 = spark.read.options(inferSchema='True',delimiter=',') \
  .csv("/path/zipcodes.csv")



# Chaining multiple options
df4 = spark.read.option("inferSchema",True) \
                .option("delimiter",",") \
  .csv("/path/zipcodes.csv")

# Using header option
df3 = spark.read.options(header='True', inferSchema='True', delimiter=',') \
  .csv("/path/zipcodes.csv")
  

In [12]:
# Reading CSV files with a user-specified custom schema in PySpark involves defining the schema explicitly before loading the data
# Imports
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType

# Using custom schema
schema = StructType() \
      .add("RecordNumber",IntegerType(),True) \
      .add("Zipcode",IntegerType(),True) \
      .add("ZipCodeType",StringType(),True) \
      .add("City",StringType(),True) \
      .add("State",StringType(),True) \
      .add("LocationType",StringType(),True) \
      .add("Lat",DoubleType(),True) \
      .add("Long",DoubleType(),True) \
      .add("Xaxis",IntegerType(),True) \
      .add("Yaxis",DoubleType(),True) \
      .add("Zaxis",DoubleType(),True) \
      .add("WorldRegion",StringType(),True) \
      .add("Country",StringType(),True) \
      .add("LocationText",StringType(),True) \
      .add("Location",StringType(),True) \
      .add("Decommisioned",BooleanType(),True) \
      .add("TaxReturnsFiled",StringType(),True) \
      .add("EstimatedPopulation",IntegerType(),True) \
      .add("TotalWages",IntegerType(),True) \
      .add("Notes",StringType(),True)

df_with_schema = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load("/content/sample_data/zipcodes.csv")

# Save DataFrame to CSV File
df.write.option("header",True) \
 .csv("/tmp/spark_output/zipcodes")


# Using write options
df2.write.options(header='True', delimiter=',') \
 .csv("/tmp/spark_output/zipcodes")

header: Specifies whether to include a header row with column names in the CSV file.
Example: option("header", "true").
delimiter: Specifies the delimiter to use between fields in the CSV file.
Example: option("delimiter", ",").
quote: Specifies the character used for quoting fields in the CSV file. Example: option("quote", "\"").
escape: Specifies the escape character used in the CSV file.
Example: option("escape", "\\").
nullValue: Specifies the string to represent null values in the CSV file.
Example: option("nullValue", "NA").
dateFormat: Specifies the date format to use for date columns.
Example: option("dateFormat", "yyyy-MM-dd").
mode: Specifies the write mode for the output. Options include “overwrite”, “append”, “ignore”, and “error”.
Example: option("mode", "overwrite").
compression: Specifies the compression codec to use for the output file.
Example: option("compression", "gzip").

In [13]:
df_with_schema.printSchema()

root
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Xaxis: integer (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- TaxReturnsFiled: string (nullable = true)
 |-- EstimatedPopulation: integer (nullable = true)
 |-- TotalWages: integer (nullable = true)
 |-- Notes: string (nullable = true)

