# PySpark Environment

In [None]:
# Set the pyspark environment
# Use the conda environment in this path: /Users/rmontecino/anaconda3/envs/pyspark-env
import os
os.environ['SPARK_HOME'] = '/Users/rmontecino/anaconda3/envs/pyspark-env/lib/python3.12/site-packages/pyspark'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook'
os.environ['PYSPARK_PYTHON'] = 'python3'

In [None]:
# Import PySpark and initialize SparkSession
from pyspark.sql import SparkSession

In [None]:
# Create a SparkSession
spark = SparkSession.builder.appName('SparkSQL').getOrCreate()

In [None]:
# Install matplotlib
%pip install matplotlib

# Read CSV file into DataFrame

## Read CSV with header

In [None]:
# Read CSV file into DataFrame
# Use ../data/products.csv file with header
csv_file = '../data/products.csv'
df = spark.read.csv(csv_file, header=True, inferSchema=True)

# Show the DataFrame schema
df.printSchema()

# Show the first 20 rows
df.show()

## Read CSV with an explicit schema definition

In [None]:
# Import the necessary types
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

# Define a new schema
schema = StructType([
    StructField('product_id', IntegerType(), False),
    StructField('product_name', StringType(), False),
    StructField('product_category_id', IntegerType(), False),
    StructField('product_description', StringType(), True),
    StructField('product_price', FloatType(), False)
])

# Load the data with the new schema
df = spark.read.csv(csv_file, header=True, schema=schema)

# Show the DataFrame schema
df.printSchema()

# Show the first 20 rows
df.show()

# Read JSON file into DataFrame

## Single line JSON

In [None]:
# Read single line JSON
# Each row is a JSON record, records are separated by new line
json_file = '../data/products_singleline.json'
df = spark.read.json(json_file)

# Show the DataFrame schema
df.printSchema()

# Show the first 20 rows
df.show()

## Multi-lines JSON

In [None]:
# Read multi-line JSON
# Use spark json method to read multi-line JSON with multiline option
json_file = '../data/products_multiline.json'
df = spark.read.json(json_file, multiLine=True)

# Show the DataFrame schema
df.printSchema()

# Show the first 20 rows
df.show()

# DataFrame Operations

## Loading the syntethic data into DataFrame

In [None]:
# Load the synthetic data into a DataFrame
# Read with spark csv method
synthetic_file = '../data/stocks.txt'
df = spark.read.csv(synthetic_file, header=True, inferSchema=True)

# Show the DataFrame schema
df.printSchema()

# Show the first 20 rows
df.show()

## Select: Choose specific columns

In [None]:
# Select specific columns from the DataFrame: name, category, and price
df.select('name', 'category', 'price').show()

## Filter: Apply conditions to filter rows

In [None]:
# Filter rows based on a condition using filter method
df.filter(df['price'] > 100).show()

## GroupBy: Group data based on specific columns

In [None]:
# Group by category and count the number of products in each category
df.groupBy('category').count().show()

# Add aggregation like sum, avg, max, min, etc.
df.groupBy('category').agg({'price': 'avg'}).show()

## Join: Combine multiple DataFrames based on specified columns

In [None]:
# Join with another DataFrame. Create this new DF by filtering the original DF
df2 = df.filter(df['price'] > 100)

# Join the two DataFrames
df.join(df2, on='category', how='inner').show()


## WithColumn: Add new calculated columns

In [None]:
# Add a new calculated column
df.withColumn('price_after_tax', df['price'] * 1.1).show()

## Creating a barchart

In [None]:
# Create a barchart of the product prices
# First, convert the DataFrame to Pandas DataFrame
df_pandas = df.toPandas()

# Import matplotlib
import matplotlib.pyplot as plt

# Create a bar chart
df_pandas.plot(kind='bar', x='name', y='price')
plt.show()

# Turn this into a function
def show_barchart(df):
    df_pandas = df.toPandas()
    df_pandas.plot(kind='bar', x='name', y='price')
    plt.show()

# Test the function
show_barchart(df)