In [12]:
1+1

2

# 1. Initializing SparkSession
The first step in any PySpark application is to create a SparkSession, which is the entry point to programming Spark with the Dataset and DataFrame API.

In [13]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()


This code initializes a SparkSession with a single thread (local[1]) and names the application as "SparkByExamples.com".

# 2. Creating RDDs

Using parallelize():
This method creates an RDD from a list object in Python.

In [14]:
# Create RDD from parallelize
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd = spark.sparkContext.parallelize(data)


In [15]:
# Define the content of your file
text_content = """Hello, this is a test file.
This file is used for creating an RDD.
Each line will be an element in the RDD."""

# Specify the path and name of your file
file_path = "textFile.txt"

# Write the content to the file
with open(file_path, "w") as file:
    file.write(text_content)

# Now you can use this file path in your SparkContext.textFile() method


In [16]:
# Create RDD from external Data source
# Ensure you replace "/path/textFile.txt" with the actual path to the file you want to use
rdd2 = spark.sparkContext.textFile("textFile.txt")


# 3. RDD Operations
Transformations and Actions:
Transformations create a new RDD from an existing one. Examples include map, filter, flatMap, etc.
Actions return a value after computing a result from an RDD. Examples include count, first, collect, etc.

In [27]:
# Example of transformation and action
rdd = spark.sparkContext.parallelize([1,2,3,4,5]) 
rddFiltered = rdd.filter(lambda x: x%2 == 0) # Transformation 
print(rddFiltered.collect()) # Action, outputs: [2, 4]


[2, 4]


In [26]:
# Another example in this cell
# Create a list of data
data = [1, 2, 3, 4, 5]

# Create an RDD from the list
distData = spark.sparkContext.parallelize(data)

# Print the RDD
print(distData.collect())

[1, 2, 3, 4, 5]


RDD Persistence:
Using cache() or persist() methods, you can store RDDs in memory for faster access in subsequent actions.

In [28]:
rdd.cache() # Cache the RDD in memory


ParallelCollectionRDD[20] at readRDDFromFile at PythonRDD.scala:289

# 4. Converting RDDs to DataFrames and vice-versa:

In [29]:
# Convert RDD to DataFrame
dfFromRDD = rdd.toDF(["numbers"])


PySparkTypeError: [CANNOT_INFER_SCHEMA_FOR_TYPE] Can not infer schema for type: `int`.

In [23]:
# Convert DataFrame to RDD
rddFromDF = dfFromRDD.rdd


NameError: name 'dfFromRDD' is not defined

In [30]:
from pyspark.sql import Row

# Assuming 'rdd' is your existing RDD of integers
# Map each integer in the RDD to a tuple
rdd_tuples = rdd.map(lambda x: (x,))

# Now convert to DataFrame
dfFromRDD = rdd_tuples.toDF(["numbers"])


In [31]:
from pyspark.sql import Row

# Map each integer to a Row object with a 'numbers' field
rdd_rows = rdd.map(lambda x: Row(numbers=x))

# Convert to DataFrame
dfFromRDD = spark.createDataFrame(rdd_rows)


# Basic DataFrame Operations
- **Show DataFrame**
To display the contents of the DataFrame, you can use the .show() method, which prints the DataFrame rows in a tabular form.

In [32]:
dfFromRDD.show()


+-------+
|numbers|
+-------+
|      1|
|      2|
|      3|
|      4|
|      5|
+-------+



### Print Schema
The .printSchema() method displays the schema of the DataFrame, showing the column names and data types.

In [33]:
dfFromRDD.printSchema() 

root
 |-- numbers: long (nullable = true)



# DataFrame Transformations
Transformations on DataFrames return a new DataFrame and are lazily evaluated, similar to RDD transformations.

- **Select Columns**
You can select specific columns from a DataFrame using the .select() method.

In [34]:
dfSelected = dfFromRDD.select("numbers")
dfSelected.show() 

+-------+
|numbers|
+-------+
|      1|
|      2|
|      3|
|      4|
|      5|
+-------+



- **Filter Rows**
Filter rows based on a condition using the .filter() or .where() method.

In [38]:
dfFiltered = dfFromRDD.filter(dfFromRDD.numbers > 3)  # or .where(dfFromRDD.numbers > 3)
dfFiltered.show()


+-------+
|numbers|
+-------+
|      4|
|      5|
+-------+



- **Add New Column**
Add a new column to the DataFrame using the .withColumn() method.

In [41]:
from pyspark.sql.functions import col

dfWithNewColumn = dfFromRDD.withColumn("numbers_times_two", col("numbers") * 2)
dfWithNewColumn.show()


+-------+-----------------+
|numbers|numbers_times_two|
+-------+-----------------+
|      1|                2|
|      2|                4|
|      3|                6|
|      4|                8|
|      5|               10|
+-------+-----------------+



# DataFrame Actions 
Actions on a DataFrame trigger the execution of the computation and return results to the driver or write to storage.

- **Collect**
Collect the result rows to the driver as a list of Row objects.

In [43]:
rows = dfFromRDD.collect()
print(rows)


[Row(numbers=1), Row(numbers=2), Row(numbers=3), Row(numbers=4), Row(numbers=5)]


- **Count**
Return the number of rows in the DataFrame.

In [44]:
rowCount = dfFromRDD.count()
print(rowCount)


5


- **Save to Files**
Save the DataFrame to an external storage system, like a file in CSV format.

In [46]:
dfFromRDD.write.csv("numbers.csv") 

# Joining DataFrames
If you have another DataFrame and want to join it with dfFromRDD based on a common column, you can use the .join() method.

#### Define dfOther DataFrame
First, let's create a DataFrame named dfOther with an id column and another column of your choice. For simplicity, I'll create it from a list of tuples, each representing a row in the DataFrame.

In [49]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Assuming SparkSession is already initialized as 'spark'

# Sample data for dfOther
dataOther = [(1, "A"), (2, "B"), (3, "C"), (4, "D"), (5, "E")]

# Create a DataFrame from the data
dfOther = spark.createDataFrame(dataOther, ["id", "value"])

# Show the DataFrame to verify its creation
dfOther.show()


+---+-----+
| id|value|
+---+-----+
|  1|    A|
|  2|    B|
|  3|    C|
|  4|    D|
|  5|    E|
+---+-----+



# Perform the Join Operation
Now that dfOther is defined and has an id column, you can perform the join operation with dfFromRDD as intended. Make sure both DataFrames have matching column types for the join to work correctly. Assuming dfFromRDD has a numbers column that you want to join on the id column of dfOther:

In [52]:
# Perform the join operation
dfJoined = dfFromRDD.join(dfOther, dfFromRDD.numbers == dfOther.id)

# Show the result of the join
dfJoined.show() 

+-------+---+-----+
|numbers| id|value|
+-------+---+-----+
|      1|  1|    A|
|      2|  2|    B|
|      3|  3|    C|
|      4|  4|    D|
|      5|  5|    E|
+-------+---+-----+



# GroupBy and Aggregate Functions
Grouping data by one or more columns and then applying an aggregate function is a common operation in data analysis. PySpark DataFrames support a variety of aggregate functions such as count(), sum(), avg(), max(), and min().

In [54]:
from pyspark.sql.functions import col

# Assuming dfFromRDD is your initial DataFrame and it has a column named 'numbers'
dfFromRDD = dfFromRDD.withColumn("numbers_times_two", col("numbers") * 2) 

In [55]:
from pyspark.sql import functions as F

# Now perform the grouping and aggregation
dfGrouped = dfFromRDD.groupBy("numbers").agg(F.avg("numbers_times_two").alias("average_times_two"))
dfGrouped.show()


+-------+-----------------+
|numbers|average_times_two|
+-------+-----------------+
|      5|             10.0|
|      1|              2.0|
|      3|              6.0|
|      2|              4.0|
|      4|              8.0|
+-------+-----------------+



# Advanced Data Analysis and Machine Learning

#### Using Window Functions for Advanced Analytics:

- Window functions allow you to perform calculations across subsets of data without collapsing rows. You've seen a simple example, but they can be used for more complex scenarios like calculating moving averages or ranking rows within groups.

#### Example: Calculating Moving Averages and Ranking
Assuming you have a DataFrame (dfFromRDD) with the following structure:

- A **category** column to partition data,
- A **timestamp** or similar column to order data,
- A **value** column on which we want to perform calculations.

Our goal is to calculate a moving average of value over a specified window and rank rows based on value within each category.

Step 1: Import Required Functions

In [58]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F


Step 2: Define Window Specifications
For Moving Average: Define a window partitioned by category, ordered by timestamp, and specify a range between the current row and 2 rows back for the moving average calculation.

For Ranking: Define a window partitioned by category and ordered by value in descending order to rank the rows based on value.

In [59]:
# Window specification for moving average
windowSpecAvg = Window.partitionBy("category").orderBy("timestamp").rowsBetween(-2, 0)

# Window specification for ranking
windowSpecRank = Window.partitionBy("category").orderBy(F.desc("value"))


Step 3: Apply Window Functions

In [63]:
# Ensure the window specification is correct
# Assuming 'category' and 'timestamp' columns exist. If not, replace these with your actual partitioning and ordering columns
windowSpecAvg = Window.partitionBy("category").orderBy("timestamp").rowsBetween(-2, 0)

# Calculate moving average for 'numbers_times_two'
dfWithMovingAvg = dfFromRDD.withColumn("moving_avg", F.avg("numbers_times_two").over(windowSpecAvg))

dfWithMovingAvg.show()


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `category` cannot be resolved. Did you mean one of the following? [`numbers`, `numbers_times_two`].;
'Project [numbers#2L, numbers_times_two#150L, avg(numbers_times_two#150L) windowspecdefinition('category, 'timestamp ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, currentrow$())) AS moving_avg#179]
+- Project [numbers#2L, (numbers#2L * cast(2 as bigint)) AS numbers_times_two#150L]
   +- LogicalRDD [numbers#2L], false


In [64]:
# Calculate moving average
dfWithMovingAvg = dfFromRDD.withColumn("moving_avg", F.avg("value").over(windowSpecAvg)) 

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `value` cannot be resolved. Did you mean one of the following? [`numbers`, `numbers_times_two`].;
'Project [numbers#2L, numbers_times_two#150L, avg('value) windowspecdefinition('category, 'timestamp ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, currentrow$())) AS moving_avg#181]
+- Project [numbers#2L, (numbers#2L * cast(2 as bigint)) AS numbers_times_two#150L]
   +- LogicalRDD [numbers#2L], false


In [62]:
# Rank rows within each category based on value
dfWithRank = dfWithMovingAvg.withColumn("rank", F.rank().over(windowSpecRank))


NameError: name 'dfWithMovingAvg' is not defined

In [65]:
# Window specification for cumulative sum (assuming ordering by 'numbers' for demonstration)
windowSpecCumSum = Window.orderBy("numbers").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Calculate cumulative sum for 'numbers'
dfWithCumSum = dfFromRDD.withColumn("cumulative_sum", F.sum("numbers").over(windowSpecCumSum))

dfWithCumSum.show()


+-------+-----------------+--------------+
|numbers|numbers_times_two|cumulative_sum|
+-------+-----------------+--------------+
|      1|                2|             1|
|      2|                4|             3|
|      3|                6|             6|
|      4|                8|            10|
|      5|               10|            15|
+-------+-----------------+--------------+



# 1. Advanced Window Function: Calculating Group-Specific Moving Averages
Let's simulate a scenario where your DataFrame includes sales data with multiple stores (categories), dates, and sales amounts. The goal is to calculate a 7-day moving average of sales for each store.

Note: This example is hypothetical. Adjust your DataFrame creation or modification steps according to your actual data structure.

Sample Data Preparation
First, let's create a DataFrame representing our sales data.


In [66]:
from pyspark.sql import Row
import datetime

# Example sales data: store, date, sales
sales_data = [
    Row(store="A", date=datetime.date(2020, 1, 1), sales=100),
    Row(store="A", date=datetime.date(2020, 1, 2), sales=150),
    # Add more rows for each store and date
]

# Create DataFrame
dfSales = spark.createDataFrame(sales_data)

# Assuming dfSales has 'store', 'date', and 'sales' columns
dfSales.show()


+-----+----------+-----+
|store|      date|sales|
+-----+----------+-----+
|    A|2020-01-01|  100|
|    A|2020-01-02|  150|
+-----+----------+-----+



Calculating Moving Averages

In [67]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

# Define the window specification
windowSpec = Window.partitionBy("store").orderBy("date").rowsBetween(-6, 0)

# Calculate 7-day moving average of sales for each store
dfMovingAvg = dfSales.withColumn("7_day_avg_sales", avg("sales").over(windowSpec))

dfMovingAvg.show()


+-----+----------+-----+---------------+
|store|      date|sales|7_day_avg_sales|
+-----+----------+-----+---------------+
|    A|2020-01-01|  100|          100.0|
|    A|2020-01-02|  150|          125.0|
+-----+----------+-----+---------------+



# 2. Ranking Data Within Groups
Using the same sales DataFrame, we might want to rank stores by their sales on each day.

In [68]:
from pyspark.sql.functions import rank

# Define window spec for ranking
windowSpecRank = Window.partitionBy("date").orderBy(dfSales.sales.desc())

# Rank stores by sales within each date
dfRanked = dfSales.withColumn("rank", rank().over(windowSpecRank))

dfRanked.show() 

+-----+----------+-----+----+
|store|      date|sales|rank|
+-----+----------+-----+----+
|    A|2020-01-01|  100|   1|
|    A|2020-01-02|  150|   1|
+-----+----------+-----+----+



# 3. Integration with Machine Learning
After performing data preprocessing and feature engineering using window functions, you can use PySpark MLlib to build predictive models. Let's say you want to predict future sales based on historical data.

Feature Engineering
Assume dfSales now includes a 7_day_avg_sales column from our previous step. You might want to create feature vectors from this and other relevant columns.

In [71]:
#!pip install numpy 

## Step 1: Calculate the 7_day_avg_sales Column
Ensure you have the calculation for 7_day_avg_sales correctly applied to dfSales. For example, if calculating a moving average:

In [74]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Define the window specification for moving average calculation
windowSpec = Window.partitionBy("store").orderBy("date").rowsBetween(-6, 0)

# Calculate the moving average and add as a new column
dfSales = dfSales.withColumn("7_day_avg_sales", F.avg("sales").over(windowSpec)) 

## Step 2: Use the VectorAssembler
After confirming that dfSales now includes the 7_day_avg_sales column, proceed with the VectorAssembler:

In [76]:
from pyspark.ml.feature import VectorAssembler

# Assuming dfSales has been correctly prepared with a '7_day_avg_sales' column

# Assemble features
assembler = VectorAssembler(inputCols=["7_day_avg_sales"], outputCol="features")
dfFeatures = assembler.transform(dfSales)

# Correct way to select features and label
dfModelData = dfFeatures.select("features", dfFeatures.sales.alias("label")) 

## Step 4: Train a Machine Learning Model

We will use a simple Linear Regression model as an example. This model will try to predict the sales (label) based on the 7_day_avg_sales (features) you've prepared.

- **4.1 Split the Data**

Split the data into training and test datasets to evaluate the model's performance on unseen data.

In [77]:
# Split the data
(trainingData, testData) = dfModelData.randomSplit([0.8, 0.2])


In [78]:
from pyspark.ml.regression import LinearRegression

# Initialize the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="label")

# Train the model
lrModel = lr.fit(trainingData)


## Step 5: Make Predictions and Evaluate the Model
Use the trained model to make predictions on the test data and evaluate the model's performance.

- **5.1 Make Predictions**

In [80]:
# Make predictions on the test data
predictions = lrModel.transform(testData)

# Show some predictions
predictions.select("prediction", "label", "features").show(5)


+----------+-----+--------+
|prediction|label|features|
+----------+-----+--------+
|     150.0|  100| [100.0]|
+----------+-----+--------+



- **5.2 Evaluate the Model**
Evaluate the model using an evaluator, such as RMSE (Root Mean Squared Error), to quantify the model's performance.

In [82]:
from pyspark.ml.evaluation import RegressionEvaluator

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


Root Mean Squared Error (RMSE) on test data = 50
