In [13]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('read_write_Files').getOrCreate()

In [9]:
# Generate some sample data and convert to DF

import random
from datetime import datetime, timedelta
from faker import Faker
fake = Faker()

def generate_sample_data(num_entries=5):
    names = ["Alice", "Bob", "Charlie", "David", "Eva"]
    departments = ["HR", "IT", "Finance", "Marketing", "Operations"]

    current_year = datetime.now().year
    date_of_births = [datetime(current_year - random.randint(10, 50), random.randint(1, 12), random.randint(1, 28)) for _ in range(num_entries)]

    sample_data = []

    for _ in range(num_entries):
        name = fake.first_name()
        age = current_year - date_of_births[_].year
        dept = random.choice(departments)
        salary = round(random.uniform(40000, 120000) , 1)
        dob = date_of_births[_].strftime("%Y-%m-%d")
        married = random.choice([True, False])
        gender = random.choice(["M", "F", None])
        # array of languages spoken
        langs = random.sample(["English", "Spanish", "French", "German", "Chinese"], random.randint(2, 3))
        # contact information (dictionary)
        contact = {'Phone': fake.phone_number(), 'Email': fake.email()}
        # skills (dictionary)
        coms = {'Communication': random.choice(['Excellent', 'Good', 'Average']) }
        
        entry = (name, age, dept, salary, dob, married, gender, langs, contact, coms)
        sample_data.append(entry)

    return sample_data

# Generate sample entries
data = generate_sample_data(100)

In [None]:
# Create a user-specified custom schema

# PySpark SQL provides StructType & StructField classes to programmatically specify the structure to the DataFrame.
from pyspark.sql.types import StructType,StructField
# Class to define the structure of the DataFrame.
from pyspark.sql.types import StringType, IntegerType, FloatType, BooleanType, DateType, ArrayType, MapType

# StructField class to define the columns which include column name(String), column type (DataType), nullable column (Boolean) and metadata (MetaData)

# Define the custom schema for the DataFrame
schema = StructType([
    StructField('Name', StringType(), True),
    StructField('Age', IntegerType(), True),
    StructField('Dept', StringType(), True),
    StructField('Salary', StringType(), True),
    StructField('Birth', StringType(), True),
    StructField("Married", BooleanType(), True),
    StructField('Gender', StringType(), True),
    StructField('Languages', ArrayType(StringType()), True),
    StructField('Contact', MapType(StringType(), StringType()), True),
    StructField('Skills', MapType(StringType(),StringType()),True)

])

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Show the DataFrame 
df.printSchema()
df.show()

# Create df with defined Schema
deptDF = spark.createDataFrame(data, schema)

# Extract field names from the schema and convert to an array
schema_array = [field.name for field in schema.fields]

# Create df with defined column names
deptDF = spark.createDataFrame(data, schema)
deptDF.printSchema()

# Assign 
df = deptDF

In [15]:
from pyspark.sql.functions import col, concat_ws,  to_json

# Save to CSV
# CSV format does not support complex types like array or maps directly.
# convert the complex fields to json 

df_csv = df.withColumn("Languages", to_json("Languages")) \
            .withColumn("Contact", to_json("Contact")) \
            .withColumn("Skills", to_json("Skills"))
            
# PySpark saves data into multiple parts or partitions by default for performance and parallelism reasons. 
# The number of partitions can be controlled by various configuration options.
# To save into single file, repartition to 1. 

df_single_partition = df_csv.coalesce(1)
df_single_partition.write.csv("files/csv/person", header=True, mode="overwrite")

df_single_partition = df.coalesce(1)

# Save to JSON
df_single_partition.write.json("files/json/person",  mode="overwrite", compression="gzip")
df_single_partition.write.json("files/json/person",  mode="overwrite")

# Save to Parquet
df_single_partition.write.parquet("files/parquet/person", mode="overwrite", compression="snappy")

# Save to Avro - need to load avro module during pyspark ses
df_single_partition.write.format("avro").save("files/avro/person")

# Save to ORC
df_single_partition.write.orc("files/orc/person", mode="overwrite")


                                                                                

In [16]:
# Read CSV
df = spark.read.csv("files/csv/person")
# Read using format and load
df = spark.read.format("csv").load("files/csv/person")

# Read multiple files
df = spark.read.csv("path1, path2, path3")

# Read all CSV Files in a Directory
df = spark.read.csv("Folder_Path")

# Read file with Options 
# PySpark reads all columns as a string (StringType) by default.

df3 = spark.read \
    .format("csv")\
    .option("header",True) \
    .option("inferSchema",True) \
    .option(delimiter=',') \
    .load("files/csv/person")
    
# or use options
df4 = spark.read.options(header='True', inferSchema='True', delimiter=',') \
            .csv("files/csv/person")

In [None]:
# Write PySpark DataFrame to CSV file
df.write.option("header", True) \
    .csv("files/csv/person")

df.write.options(header='True', delimiter=',') \
    .csv("files/csv/person")

# mode = overwrite / append / ignore (if file exists)
df.write.mode('overwrite').csv("files/csv/person")
# or use format and save
df.write.format("csv").mode('overwrite').save("files/csv/person")


In [None]:
# Read JSON

# Read JSON file into dataframe
df = spark.read.json("resources/zipcodes.json")
df.printSchema()
df.show()

# Read JSON file into dataframe
df = spark.read.format("json") \
        .load("resources/zipcodes.json")

# Read multiline json file ( when json is in multiline format )
multiline_df = spark.read.option("multiline","true") \
      .json("resources/multiline-zipcode.json")


# Read multiple files, from different paths
df2 = spark.read.json(
    ['folder/file1.json','resources/file2.json'])

# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")
df3.show()

# Reading files with a user-specified custom schema
df_with_schema = spark.read.schema(schema) \
        .json("resources/zipcodes.json")
df_with_schema.printSchema()

# Write PySpark DataFrame to JSON file
df2.write.json("/tmp/spark_output/zipcodes.json")

# Write with mode = overwrite, append, ignore, errorifexists.
df2.write.mode('Overwrite').json("/tmp/spark_output/zipcodes.json")

In [None]:
# Parquet 
# Pyspark SQL automatically capture the schema of the original data. 
# When write a DataFrame to parquet file, it automatically preserves column names and their data types.

# Read parquet file using read.parquet()
parDF = spark.read.parquet("files/parquet/person")

# Write DataFrame to parquet file using write.parquet()
df.write.parquet("/tmp/output/people.parquet")
# or with mode
df.write.mode("append").parquet("path/to/parquet/file")
# Using append and overwrite to save parquet file
df.write.mode('append').parquet("path/to/parquet/file")
df.write.mode('overwrite').parquet("path/to/parquet/file")

# Create Parquet partition file
df.write.partitionBy("gender","salary").mode("overwrite").parquet("/tmp/output/people2.parquet")
# Read parquet with partition gender=M
parDF2 = spark.read.parquet("/tmp/output/people2.parquet/gender=M")


In [None]:
# Hive
# PySpark writes the data to the default Hive warehouse location which is /user/hive/warehouse when you use a Hive cluster.
# By default it creates files in parquet format with snappy compression.

from os.path import abspath
from pyspark.sql import SparkSession

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

# Create spark session with hive enabled
spark = SparkSession \
    .builder \
    .appName("read_write_Hive") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://remote-host:9083") \
    .enableHiveSupport() \
    .getOrCreate()

# Create DataFrame 
data = [(1, "James",30,"M"), (2, "Ann",40,"F"),
    (3, "Jeff",41,"M"),(4, "Jennifer",20,"F")]
columns = ["id", "name","age","gender"]

sampleDF = spark.sparkContext.parallelize(data).toDF(columns)

# Create Hive Internal table
sampleDF.write.mode('overwrite') \
         .saveAsTable("employee")

# Save the DataFrame as a Hive table with various options
df.write.option("compression", "gzip").mode("overwrite").saveAsTable("my_hive_table")

# Save the DataFrame as a Hive table with format and additional options
df.write.format("parquet").option("compression", "snappy").mode("append").saveAsTable("my_hive_table_2")

# Read Hive table
df = spark.read.table("employee")
# or
# Read Hive table by SQL
df = spark.sql("select * from employee")
df.show()

# Create Hive Internal table
sampleDF.write.mode('overwrite') \
    .saveAsTable("emp.employee")

# Create Hive External table
sampleDF.write.mode('overwrite')\
        .option("path", "/path_to_external/table")\
        .saveAsTable("emp.employee")

# Create database 
spark.sql("CREATE DATABASE IF NOT EXISTS emp")

#  Create table and load data 
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'path/file.txt' INTO TABLE src")


In [None]:
# Delta Tables

# Delta Lake is an open-source storage layer, providing solution for managing structured and semi-structured data in big data environments. 
# In Databricks, Delta Tables are tables stored in Delta Lake format.
# Delta Tables enable advanced capabilities such as schema enforcement, data versioning, and transactional processing.

# Specify the Delta table path
delta_table_path = "/path/to/delta-table"

# Save the DataFrame as a Delta table
df.write.format("delta").mode("overwrite").save(delta_table_path)
df.write.format("delta").mode("append").save(delta_table_path)

# Read data from a Delta table into a DataFrame
df_read = spark.read.format("delta").load(delta_table_path)

# Update data in a Delta table
update_condition = (df_read["Name"] == "John")
df_read.where(update_condition).update("Age", 30)

# Delete data from a Delta table
delete_condition = (df_read["Name"] == "Alice")
df_read.where(delete_condition).delete()

# Time Travel in Delta Tables:

# List all available versions of a Delta Table
df_version = spark.sql("DESCRIBE HISTORY delta.`/path/to/delta-table`")
df_version.show()
display(df_version)

# Display time travel versions for the Delta Table
df_history = spark.sql(f"DESCRIBE HISTORY delta.`{delta_table_path}`")
df_history.show(truncate=False)

display(df_history)

# Querying/Read data from a Delta Table as of a specific version
# Use the option("versionAsOf", version_number) to query the Delta Table at different versions.
version_number = 1
df_as_of_version = spark.read.format("delta").option("versionAsOf", version_number).load("/path/to/delta-table")
df_as_of_version.show()

# Schema Evolution in Delta Tables:
# Schema evolution allowing to add, drop, or modify columns over time without requiring a full rewrite of the table.
# Use option ("mergeSchema", True) to enable schema evolution, allowing the Delta table schema to be updated to match the schema of the DataFrame being written.
df.write.format("delta").option("mergeSchema", "true").mode("append").save(delta_table_path)

# Vacuum a Delta table, reclaim storage by removing older versions of data files.
# Run the VACUUM operation with optional parameters
# RETAIN num HOURS: Specify the minimum number of hours a file must be retained.
# RETAIN num VERSIONS: Specify the minimum number of versions a file must be retained.
# DRY RUN: Simulates the VACUUM operation without actually deleting files.
retention_hours = "24"
retained_files = spark.sql(f"VACUUM delta.`{delta_table_path}` RETAIN {retention_hours} HOURS DRY RUN")

In [None]:
# JDBC is a Java standard to connect to any database as long as you provide the right JDBC connector jar in the classpath and provide a JDBC driver using the JDBC API. 
# PySpark also leverages the same JDBC standard when using jdbc() method.

# To query a database table using jdbc() method, you would need the following.

# Server IP or Host name and Port,
# Database name,
# Table name,
# User and Password.

# Create SparkSession
spark = SparkSession.builder \
           .appName('queryJDBC') \
           .config("spark.jars", "mysql-connector-java-8.x.xx.jar") \
           .getOrCreate()

# Query table using jdbc()
df = spark.read \
    .jdbc("jdbc:mysql://localhost:3306/emp", "employee", \
          properties={"user": "root", "password": "root", "driver":"com.mysql.cj.jdbc.Driver"})

# show DataFrame
df.show()

# or by using DataFrameReader.format("jdbc").load()
# Query from MySQL Table
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

# SQL Query Specific Columns of JDBC Table
# above extracts the entire JDBC table into PySpark DataFrame. Sometimes you may be required to query specific columns with where condition
# .option("query", "select id,age from employee where gender='M'") \

# Read MySQL Table in Parallel
# Use option numPartitions to read MySQL table in parallel
# Use the fetchsize option which is used to specify how many rows to fetch at a time 

# Using numPartitions
df = spark.read \
  .format("jdbc") \
  .option("query", "select id,age from employee where gender='M'") \
  .option("numPartitions",5) \
  .option("fetchsize", 20) \
  .load()

# Write to MySQL Table
sampleDF.write \
  .format("jdbc") \
  .option("driver","com.mysql.cj.jdbc.Driver") \
  .option("url", "jdbc:mysql://localhost:3306/emp") \
  .option("dbtable", "employee") \
  .option("user", "root") \
  .option("password", "root") \
  .save()