# Spark DataFrame - Basics

Let's start off with the fundamentals of Spark DataFrame. 

Objective: In this exercise, you'll find out how to start a spark session, read in data, explore the data and manipuluate the data (using DataFrame syntax as well as SQL syntax). Let's get started! 

In [None]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('basics').getOrCreate()

In [None]:
# Import necessary Python libraries
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# Let's read in the data. Note that it's in the format of JSON.
df = spark.read.json('Datasets/people.json')

## Data Exploration

In [None]:
# The show method allows you visualise DataFrames. We can see that there are two columns. 
df.show()

# You could also try this. 
df.columns

In [None]:
# We can use the describe method get some general statistics on our data too. Remember to show the DataFrame!
# But what about data type?
df.describe().show()

In [None]:
# For type, we can use print schema. 
# But wait! What if you want to change the format of the data? Maybe change age to an integer instead of long?
df.printSchema()

## Data Manipulation

In [None]:
# Let's import in the relevant types.
from pyspark.sql.types import (StructField,StringType,IntegerType,StructType)

In [None]:
# Then create a variable with the correct structure.
data_schema = [StructField('age',IntegerType(),True),
              StructField('name',StringType(),True)]

final_struct = StructType(fields=data_schema)

In [None]:
# And now we can read in the data using that schema. If we print the schema, we can see that age is now an integer. 
df = spark.read.json('Datasets/people.json', schema=final_struct)

df.printSchema()

In [None]:
# We can also select various columns from a DataFrame. 
df.select('age').show()

# We could split up these steps, first assigning the output to a variable, then showing that variable. As you see, the output is the same.
ageColumn = df.select('age')

ageColumn.show()

In [None]:
# We can also add columns, manipulating the DataFrame.

df.withColumn('double_age',df['age']*2).show()

# But note that this doesn't alter the original DataFrame. You need to assign the output to a new variable in order to do so.
df.show()

In [None]:
# We can rename columns too! 
df.withColumnRenamed('age', 'my_new_age').show()

## Introducing SQL
We can query a DataFrame as if it were a table! Let's see a few examples of that below:

In [None]:
# First, we have to register the DataFrame as a SQL temporary view.
df.createOrReplaceTempView('people')

# After that, we can use the SQL programming language for queries. 
results = spark.sql("SELECT * FROM people")

In [None]:
# Here's another example:
results = spark.sql("SELECT age FROM people WHERE age >= 19")
results.show()

In [None]:
# Convert the DataFrame to a Pandas DataFrame for plotting
df_pandas = df.toPandas()

# Plotting using matplotlib
plt.figure(figsize=(10, 5))
plt.hist(df_pandas['age'], bins=20, color='skyblue')
plt.title('Age Distribution')
plt.xlabel('Age')
plt.ylabel('Count')
plt.show()


In [None]:
# Load CSV files
file_names = [
    "Datasets/2022 Policy Strength.csv",
    "Datasets/2022 Ocean Science Funding.csv",
    "Datasets/2022 Marine Key Biodiversities Areas Percentage.csv",
    "Datasets/2018 Sustainable Fish Stock.csv",
    "Datasets/1969 - 2018 capture-fisheries-vs-aquaculture.csv"
]

In [None]:
dfs = []
for file_name in file_names:
    df = spark.read.csv(file_name, header=True, inferSchema=True)
    dfs.append(df)

In [None]:
# Explore Data
for df in dfs:
    df.show(10)

In [None]:
# Data visualization for each table
# 1. 2022 Policy Strength
df_0 = dfs[0]
df_0 = df_0.withColumn("Value", df_0["Value"].cast(IntegerType()))
df_0 = df_0.na.drop()
df_0_pd = df_0.toPandas()

In [None]:
# Plotting with Pandas (alternative to Matplotlib for this step)
plt.figure(figsize=(10, 6))
df_0_pd.plot(kind="bar", x="GeoAreaName", y="Value", rot=90)
plt.xlabel("Country")
plt.ylabel("Policy Strength")
plt.title("2022 Policy Strength")
plt.show()

In [None]:
# 2. 2022 Ocean Science Funding
df_1 = dfs[1]
df_1 = df_1.withColumn("GDP", df_1["GDP"].cast(IntegerType()))
df_1 = df_1.na.drop()

# Convert to Pandas DataFrame for plotting
df_1_pd = df_1.toPandas()

# Plot using Matplotlib
plt.figure(figsize=(10, 6))
plt.barh(df_1_pd["GeoAreaName"], df_1_pd["GDP"])
plt.xlabel("GDP Allocation")
plt.ylabel("Country")
plt.title("2022 Ocean Science Funding")
plt.tight_layout()
plt.show()

In [None]:
# 3. 2022 Marine Key Biodiversities Areas Percentage (scatter plot)
df_2 = dfs[2]
df_2 = df_2.withColumn("Percent", df_2["MPA Percent"].cast(IntegerType()))
df_2 = df_2.na.drop()
df_2_pd = df_2.toPandas()

In [None]:
# Plotting with Pandas
plt.figure(figsize=(10, 6))
plt.scatter(df_2_pd["GeoAreaName"], df_2_pd["Percent"])
plt.xlabel("Country")
plt.ylabel("Percentage")
plt.title("2022 Marine Key Biodiversities Areas Percentage")
plt.xticks(rotation=90)
plt.tight_layout()
plt.show()

In [None]:
# 4. 2018 Sustainable Fish Stock
df_3 = dfs[3]
df_3 = df_3.withColumn("Percentage", df_3["Percentage"].cast(IntegerType()))
df_3 = df_3.na.drop()
df_3_pd = df_3.toPandas()

In [None]:
# Plotting with Pandas
plt.figure(figsize=(10, 6))
plt.scatter(df_3_pd["GeoAreaName"], df_3_pd["Percentage"])
plt.xlabel("Country")
plt.ylabel("Sustainability Rating")
plt.title("2018 Sustainable Fish Stock")
plt.xticks(rotation=90)
plt.tight_layout()
plt.show()

In [None]:
# 5. 1969-2018 Capture Fisheries vs Aquaculture
df_4 = dfs[4]
df_4 = df_4.na.drop()
df_4 = df_4.withColumn("Year", df_4["Year"].cast(IntegerType()))
df_4 = df_4.withColumn("Capture", df_4["Capture"].cast(IntegerType()))
df_4 = df_4.withColumn("Aquaculture", df_4["Aquaculture"].cast(IntegerType()))
df_4 = df_4.na.drop()
df_4_pd = df_4.toPandas()

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(df_4_pd["Year"], df_4_pd["Capture"], color="red", label="Capture Fisheries")
plt.plot(df_4_pd["Year"], df_4_pd["Aquaculture"], color="blue", label="Aquaculture")
plt.xlabel("Year")
plt.ylabel("Tonnes")
plt.legend()
plt.title("Capture Fisheries vs Aquaculture (1969-2018)")
plt.tight_layout()
plt.show()

In [None]:
# Quality report for each dataframe
for i, df in enumerate(dfs):
    print(f"Table {i + 1}:")
    df.printSchema()
    df.describe().show()

# Data Preparation
# In this section, you can add data preprocessing and transformation steps as needed.
# For example, renaming columns, merging dataframes, handling missing values, and feature engineering.

# Model Building
# You can add machine learning model building and evaluation steps in this section.
# For example, building a classification or regression model to address the overfishing problem.

# Action Plan and Implementation
# Outline your plan for implementing the solution, monitoring, and continuous improvement.

# 07-DM
# Execute DM task (if applicable)

# 08-INT
# Summarize Results

# Add relevant tables or graphs

# 09-ACT
# Describe the Action Plan for Implementation, Observation, and Improvement

In [None]:
from pyspark.sql.functions import when

# 03-DP

# Print the current dataframe
dfs[3].select("GeoAreaName", "Percentage").show(5)

In [None]:
# Add SDG14_4_1 column
dfs[3] = dfs[3].withColumn("SDG14_4_1", when(dfs[3]["Percentage"] > 50, 1).otherwise(0))

# Print the updated dataframe
dfs[3].select("GeoAreaName", "Percentage", "SDG14_4_1").show(5)

In [None]:
# Data Cleaning
# Rename "Entity" column to "GeoAreaName" in table 5
dfs[4] = dfs[4].withColumnRenamed("Entity", "GeoAreaName")

# Merge the tables based on the "GeoAreaName" column
merged_df = dfs[0]
for i in range(1, len(dfs)):
    merged_df = merged_df.join(dfs[i], on=["GeoAreaName"], how="outer")

# Display the merged table
merged_df.show(5)

In [None]:
# Drop rows with missing values
merged_df = merged_df.dropna()

# Count the number of rows
num_rows_left = merged_df.count()

# Display the dataframe without missing values and the count
print("Number of rows left:", num_rows_left)

In [None]:
# 04-DT: Data Transformation

# Remove the "Code" column from the merged_df dataframe
merged_df = merged_df.drop("Code")

# Transform the "SDG14_4_1" column values to 1 if true, otherwise 0
from pyspark.sql.functions import expr
merged_df = merged_df.withColumn("SDG14_4_1", when(expr("SDG14_4_1 = true"), 1).otherwise(0))

# Display the modified merged table (first 5 rows)
merged_df.show(5)

In [None]:
# Filter out rows with None values in the "log_SDG14_4_1" column
filtered_df = merged_df.filter(merged_df["SDG14_4_1"].isNotNull())

# Plot a histogram of the log values
import matplotlib.pyplot as plt
log_values = filtered_df.select("SDG14_4_1").rdd.flatMap(lambda x: x).collect()
plt.hist(log_values, bins=[0, 0.5, 1], edgecolor='black')
plt.xlabel("(SDG14_4_1)")
plt.ylabel("Frequency")
plt.title("Distribution of (SDG14_4_1)")
plt.xticks([0,0.5, 1])
plt.show()

In [None]:
# Calculate counts of 0s and 1s in the "SDG14_4_1" column
counts = merged_df.groupBy("SDG14_4_1").count()
counts.show()

In [None]:
# Print the modified merged table (first 5 rows)
merged_df.show(5)

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import Row

# Define Features (X) and Target (y)
features_to_predict = ["GDP", "MPA Percent", "Aquaculture", "Value"]
assembler = VectorAssembler(inputCols=features_to_predict, outputCol="features")
data = assembler.transform(merged_df)
data = data.withColumnRenamed("SDG14_4_1", "label")

# Convert boolean target 'label' to integers (1 for True, 0 for False)
data = data.withColumn("label", data["label"].cast("int"))

# Standardize features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(data)
data = scaler_model.transform(data)

# Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Build a Linear Regression model
lr = LinearRegression(featuresCol="scaled_features", labelCol="label")

# Fit the model to the training data
lr_model = lr.fit(train_data)

# Make predictions on the test data
predictions = lr_model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(metricName="mae")
mae = evaluator.evaluate(predictions)

# Print the Mean Absolute Error (MAE)
print("Mean Absolute Error:", mae)

# Create a DataFrame for results
results = predictions.select("GDP", "MPA Percent", "Aquaculture", "Value", "label", "prediction")
results.show()

# Plotting Predictor Importance (requires exporting data for Matplotlib)
import matplotlib.pyplot as plt

# Collect feature importances data from the model
feature_importance_data = lr_model.coefficients

# Convert to a list for plotting
feature_importance_list = [float(val) for val in feature_importance_data]

# List of feature names (adjust as needed)
predictor_names = features_to_predict

# Plot the feature importance using Matplotlib
plt.figure(figsize=(10, 6))
plt.bar(predictor_names, feature_importance_list)
plt.xlabel("Predictors")
plt.ylabel("Feature Importance")
plt.title("Predictor Importance in SDG14.4.1 Prediction")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# Create a Summary Table
model_name = "Linear Regression Model"
summary_data = [
    (model_name, "Number of Features", len(features_to_predict)),
    (model_name, "Number of Epochs", 0),
    (model_name, "Batch Size", 0),
    (model_name, "Mean Absolute Error (MAE)", mae),
]

# Calculate correct and wrong predictions
correct_predictions = predictions.filter(predictions["label"] == predictions["prediction"]).count()
total_predictions = predictions.count()
wrong_predictions = total_predictions - correct_predictions
correct_percentage = (correct_predictions / total_predictions) * 100
wrong_percentage = 100 - correct_percentage

# Add rows to the summary data
summary_data.extend([
    (model_name, "Total", total_predictions),
    (model_name, "Correct", correct_predictions),
    (model_name, "Wrong", wrong_predictions),
    (model_name, "Correct Percentage", correct_percentage),
    (model_name, "Wrong Percentage", wrong_percentage),
])

# Create an RDD from the list of tuples
summary_data_rdd = spark.sparkContext.parallelize(summary_data)

# Create a PySpark DataFrame for the summary table
summary_table = spark.createDataFrame(summary_data_rdd, ["Model", "Metric", "Value"])

# Display the summary table
summary_table.show()


Now that we're done with this tutorial, let's move on to Spark DataFrame Operations!