In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=267fbd16acca656037fd293898a1eeca0c4e66c3948c519d172ac4cd105f6edd
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("egd").getOrCreate()


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, datediff, to_date, lit, min, lag, max, year, sum, when, desc, month
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.ml.stat import Correlation
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.window import Window
import time
import psutil

In [None]:
covid_df = spark.read.csv("/content/drive/MyDrive/owid-covid-data.csv", inferSchema=True, header=True)


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/drive/MyDrive/owid-covid-data.csv.

In [None]:
# Show 5 records
print("First 5 records:")
covid_df.show(5, truncate=False)

# Data Preprocessing (Fixing NULL values)
# Get the data_type for all columns in column_name: Data_Type format
print("\nData types of columns:")
for col_name, col_type in covid_df.dtypes:
    print(f"{col_name}: {col_type}")

# Get the data_type of all columns only Data_Type
column_types = [col_type for col_name, col_type in covid_df.dtypes]

# Count the number of double, string, and date columns
num_double_columns = column_types.count('double')
num_string_columns = column_types.count('string')
num_date_columns = column_types.count('date')

# Print the counts
print("\nCounts of column types:")
print(f"Number of double columns: {num_double_columns}")
print(f"Number of string columns: {num_string_columns}")
print(f"Number of date columns: {num_date_columns}")

# Show 5 records of all string fields
print("\nFirst 5 records of string fields:")
covid_df.select("iso_code", "location", "continent", "date", "tests_units").show(5, truncate=False)

# Get all the nulls in all the columns
# print("\nNull counts in all columns:")
# for c in covid_df.columns:
#    miss_vals = covid_df.select([F.count(F.when(F.isnull(c), c)).alias(c)])
#    miss_dic = miss_vals.collect()[0].asDict()
#    print(f"{c}: {miss_dic[c]}")

# Filling all NULL values
# Fix the continent null values
print("\nDistinct continents before fixing NULLs:")
covid_df.select("continent").distinct().show()

covid_dfc = covid_df.fillna({'continent': 'OWID'})
print("\nDistinct continents after fixing NULLs:")
covid_dfc.select("continent").distinct().show()

# Fix the tests_units null values
print("\nDistinct test units before fixing NULLs:")
covid_df.select("tests_units").distinct().show()

covid_dft = covid_dfc.fillna({'tests_units': 'no_info'})
print("\nDistinct test units after fixing NULLs:")
covid_dft.select("tests_units").distinct().show()

# Fix all double fields with null values covid_dff is final data frame
covid_dff = covid_dft.fillna(0)

# Final check on nulls
# print("\nFinal NULL check in all columns:")
# for c in covid_dff.columns:
#    miss_vals = covid_dff.select([F.count(F.when(F.isnull(c), c)).alias(c)])
#    miss_dic = miss_vals.collect()[0].asDict()
#    print(f"{c}: {miss_dic[c]}")

First 5 records:
+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+--------------

In [None]:
print("First 5 records:")
covid_dff.show(5, truncate=False)
covid_df_saved = covid_dff

First 5 records:
+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+--------------

In [None]:
# Convert 'date' to a numeric feature: days since the start of the dataset
reference_date = covid_dff.select(min(col("date"))).collect()[0][0]
covid_dff = covid_dff.withColumn("days_since_start", datediff(col("date"), lit(reference_date)))

# Handle 'location' via String Indexing and One-Hot Encoding
indexer = StringIndexer(inputCol="location", outputCol="location_index")
covid_dff = indexer.fit(covid_dff).transform(covid_dff)
encoder = OneHotEncoder(inputCols=["location_index"], outputCols=["location_encoded"])
covid_dff = encoder.fit(covid_dff).transform(covid_dff)

# Select numerical features including newly created 'days_since_start' and 'location_encoded'
numerical_features = [col_name for col_name, dtype in covid_dff.dtypes if dtype == 'double' or col_name in ["days_since_start", "location_encoded"]]

# Assemble features into a vector
assembler = VectorAssembler(inputCols=numerical_features, outputCol="features")
feature_vector_df = assembler.transform(covid_dff)

# Compute correlation matrix for all features
correlation_matrix = Correlation.corr(feature_vector_df, "features").head()[0]

# Extract correlations with 'total_deaths'
total_deaths_index = numerical_features.index('total_deaths')
correlations_with_deaths = [(feature, correlation_matrix[total_deaths_index, i]) for i, feature in enumerate(numerical_features) if feature != 'total_deaths']



In [None]:
# Extract correlations with 'total_deaths'
total_vaccinations_index = numerical_features.index('total_vaccinations')
correlations_with_total_vaccinations = [(feature, correlation_matrix[total_vaccinations_index, i]) for i, feature in enumerate(numerical_features) if feature != 'total_vaccinations_index']

In [None]:
# Select features with strong correlation with 'total_deaths'
threshold = 0.4
selected_features = [feature for feature, corr in correlations_with_deaths if abs(corr) > threshold]
print("Selected features based on correlation:", selected_features)


Selected features based on correlation: ['total_cases', 'total_vaccinations', 'people_vaccinated', 'people_fully_vaccinated', 'total_boosters', 'new_vaccinations', 'new_vaccinations_smoothed', 'total_vaccinations_per_hundred', 'people_vaccinated_per_hundred', 'people_fully_vaccinated_per_hundred', 'new_people_vaccinated_smoothed', 'life_expectancy', 'population', 'location_index']


In [None]:
# Select features with strong correlation with 'total_deaths'
threshold = 0.3
selected_features = [feature for feature, corr in correlations_with_total_vaccinations if abs(corr) > threshold]
print("Selected features based on correlation:", selected_features)


Selected features based on correlation: ['total_cases', 'total_deaths', 'total_vaccinations', 'people_vaccinated', 'people_fully_vaccinated', 'total_boosters', 'new_vaccinations', 'new_vaccinations_smoothed', 'people_vaccinated_per_hundred', 'people_fully_vaccinated_per_hundred', 'new_people_vaccinated_smoothed', 'cardiovasc_death_rate', 'life_expectancy', 'population']


In [None]:
# Prepare data for regression model
final_assembler = VectorAssembler(inputCols=selected_features, outputCol="final_features")
final_data = final_assembler.transform(covid_dff)
final_data = final_data.select(col("final_features").alias("features"), col("total_deaths").alias("label"))

# Split data into training and testing sets
train_data, test_data = final_data.randomSplit([0.7, 0.3], seed=42)



**Linear Regession:**

In [None]:
start_time = time.time()
# Train the linear regression model
lr = LinearRegression(featuresCol='features', labelCol='label')
lr_model = lr.fit(train_data)

training_summary = lr_model.summary
print("Training RMSE:", training_summary.rootMeanSquaredError)
print("Training R-squared:", training_summary.r2)


# Evaluate the model on testing data
test_results = lr_model.evaluate(test_data)
print("Testing RMSE:", test_results.rootMeanSquaredError)
print("Testing R-squared:", test_results.r2)
end_time = time.time()
wall_time  = end_time - start_time

cpu_times = psutil.cpu_times()
user_time = int(cpu_times.user * 1000)
sys_time = int(cpu_times.system * 1000)
other_time = int(cpu_times.idle * 1000) + int(cpu_times.iowait * 1000) + int(cpu_times.irq * 1000) + int(cpu_times.softirq * 1000)
total_time = user_time + sys_time + other_time
print("Execution Time:", wall_time, "seconds")
print(f"CPU times: user {user_time} ms, sys: {sys_time} ms, total: {total_time} ms")


Training RMSE: 6.643196358378366e-11
Training R-squared: 1.0
Testing RMSE: 6.527641301255345e-11
Testing R-squared: 1.0
Execution Time: 5.3446409702301025 seconds
CPU times: user 352890 ms, sys: 125460 ms, total: 1372660 ms


**Decision Trees Regression**

In [None]:
start_time = time.time()
# Initialize the Decision Tree Regressor
dt = DecisionTreeRegressor(featuresCol='features', labelCol='label')

# Train the model on the training data
dt_model = dt.fit(train_data)

# Make predictions on the training data and the test data
train_predictions = dt_model.transform(train_data)
test_predictions = dt_model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
train_rmse = evaluator.evaluate(train_predictions)
test_rmse = evaluator.evaluate(test_predictions)

evaluator.setMetricName("r2")
train_r2 = evaluator.evaluate(train_predictions)
test_r2 = evaluator.evaluate(test_predictions)
end_time = time.time()
wall_time  = end_time - start_time

cpu_times = psutil.cpu_times()
user_time = int(cpu_times.user * 1000)
sys_time = int(cpu_times.system * 1000)
other_time = int(cpu_times.idle * 1000) + int(cpu_times.iowait * 1000) + int(cpu_times.irq * 1000) + int(cpu_times.softirq * 1000)
total_time = user_time + sys_time + other_time

# Print out the metrics
print("Decision Tree - Training RMSE:", train_rmse)
print("Decision Tree - Training R-squared:", train_r2)
print("Decision Tree - Testing RMSE:", test_rmse)
print("Decision Tree - Testing R-squared:", test_r2)
print("Execution Time:", wall_time, "seconds")
print(f"CPU times: user {user_time} ms, sys: {sys_time} ms, total: {total_time} ms")
# Print out the Decision Tree model
print("Decision Tree Model:\n", dt_model.toDebugString)

Decision Tree - Training RMSE: 2523.1325259243276
Decision Tree - Training R-squared: 0.9983471217899501
Decision Tree - Testing RMSE: 2816.813732026079
Decision Tree - Testing R-squared: 0.9982475458955498
Execution Time: 9.396330833435059 seconds
CPU times: user 371780 ms, sys: 127200 ms, total: 1400490 ms
Decision Tree Model:
 DecisionTreeRegressionModel: uid=DecisionTreeRegressor_080886ecbdce, depth=5, numNodes=59, numFeatures=14
  If (feature 0 <= 3082262.0)
   If (feature 1 <= 8775.5)
    If (feature 1 <= 3604.5)
     If (feature 1 <= 1500.0)
      If (feature 1 <= 704.0)
       Predict: 77.13810110974106
      Else (feature 1 > 704.0)
       Predict: 1105.944297082228
     Else (feature 1 > 1500.0)
      If (feature 1 <= 2855.5)
       Predict: 2065.2770448548813
      Else (feature 1 > 2855.5)
       Predict: 3423.6568123393317
    Else (feature 1 > 3604.5)
     If (feature 1 <= 6116.5)
      If (feature 7 <= 3221.5)
       Predict: 4208.388888888889
      Else (feature 7 > 322

**Random Forest**

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize the Random Forest Regressor
rf = RandomForestRegressor(featuresCol='features', labelCol='label', numTrees=50, maxDepth=10)
start_time = time.time()
# Train the model on the training data
rf_model = rf.fit(train_data)

# Make predictions on the training data and the test data
train_predictions = rf_model.transform(train_data)
test_predictions = rf_model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
train_rmse = evaluator.evaluate(train_predictions)
test_rmse = evaluator.evaluate(test_predictions)

evaluator.setMetricName("r2")
train_r2 = evaluator.evaluate(train_predictions)
test_r2 = evaluator.evaluate(test_predictions)
end_time = time.time()
wall_time  = end_time - start_time

cpu_times = psutil.cpu_times()
user_time = int(cpu_times.user * 1000)
sys_time = int(cpu_times.system * 1000)
other_time = int(cpu_times.idle * 1000) + int(cpu_times.iowait * 1000) + int(cpu_times.irq * 1000) + int(cpu_times.softirq * 1000)
total_time = user_time + sys_time + other_time

# Print out the metrics
print("Random Forest - Training RMSE:", train_rmse)
print("Random Forest - Training R-squared:", train_r2)
print("Random Forest - Testing RMSE:", test_rmse)
print("Random Forest - Testing R-squared:", test_r2)
print("Execution Time:", wall_time, "seconds")
print(f"CPU times: user {user_time} ms, sys: {sys_time} ms, total: {total_time} ms")
# Feature Importance
feature_importances = rf_model.featureImportances
print("Feature Importances:", feature_importances)

Random Forest - Training RMSE: 1862.9306923674465
Random Forest - Training R-squared: 0.9990989391389138
Random Forest - Testing RMSE: 2019.2237341269238
Random Forest - Testing R-squared: 0.9990994677321766
Execution Time: 20.341695547103882 seconds
CPU times: user 409320 ms, sys: 131490 ms, total: 1463700 ms
Feature Importances: (14,[0,1,2,3,4,5,6,7,8,9,10,11,12,13],[0.32293374458591995,0.23043332703070493,0.11228442354545154,0.10354435672902539,0.10142125986160222,0.0021049503418782757,0.002760386032613102,0.004865205169821833,0.007407945853471837,0.005907593536451051,0.00313772388917029,0.01963154724449122,0.015284871153248374,0.06828266502614988])


**Linear Regressions Bayesian**

In [None]:
from pyspark.ml.regression import LinearRegression

# Initialize the Linear Regression model with L2 regularization
# ElasticNet parameter set to 0 implies L2 regularization (Ridge)
lr_bayesian = LinearRegression(featuresCol='features', labelCol='label', elasticNetParam=0, regParam=0.1)

# Train the model on the training data
lr_bayesian_model = lr_bayesian.fit(train_data)

# Make predictions on the training data and the test data
train_predictions = lr_bayesian_model.transform(train_data)
test_predictions = lr_bayesian_model.transform(test_data)

# Evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
start_time = time.time()
train_rmse = evaluator.evaluate(train_predictions)
test_rmse = evaluator.evaluate(test_predictions)

evaluator.setMetricName("r2")
train_r2 = evaluator.evaluate(train_predictions)
test_r2 = evaluator.evaluate(test_predictions)

end_time = time.time()
wall_time  = end_time - start_time

cpu_times = psutil.cpu_times()
user_time = int(cpu_times.user * 1000)
sys_time = int(cpu_times.system * 1000)
other_time = int(cpu_times.idle * 1000) + int(cpu_times.iowait * 1000) + int(cpu_times.irq * 1000) + int(cpu_times.softirq * 1000)
total_time = user_time + sys_time + other_time

# Print out the metrics
print("Bayesian Ridge - Training RMSE:", train_rmse)
print("Bayesian Ridge - Training R-squared:", train_r2)
print("Bayesian Ridge - Testing RMSE:", test_rmse)
print("Bayesian Ridge - Testing R-squared:", test_r2)
print("Execution Time:", wall_time, "seconds")
print(f"CPU times: user {user_time} ms, sys: {sys_time} ms, total: {total_time} ms")
# Coefficients and Intercept
print("Coefficients:", lr_bayesian_model.coefficients)
print("Intercept:", lr_bayesian_model.intercept)

Bayesian Ridge - Training RMSE: 1.1983578650488476
Bayesian Ridge - Training R-squared: 0.9999999996271504
Bayesian Ridge - Testing RMSE: 1.266440701851837
Bayesian Ridge - Testing R-squared: 0.9999999996457579
Execution Time: 2.4984242916107178 seconds
CPU times: user 417180 ms, sys: 131940 ms, total: 1480060 ms
Coefficients: [4.128050355243606e-06,0.9997679905479503,-1.970523710127557e-07,4.204241533122118e-07,-1.2658503292397088e-07,-6.207605777646792e-08,5.261418381127818e-07,1.6995798738909734e-06,0.056672666340748655,-0.05627504310410973,1.460219291143693e-06,0.0010381673177925128,0.01052612712519213,3.2829453912439093e-09]
Intercept: -1.0232334656637911


**Predicting the number of new cases in a day for a specific location based on the data from the previous day**


Create Lagged Features

## **SQL Code**

In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import lag, col
from pyspark.sql.types import DoubleType, FloatType

# Define a window partitioned by location, ordered by date
windowSpec = Window.partitionBy("location").orderBy("date")

# Initialize the DataFrame to apply transformations
df = covid_df_saved

# Find numerical columns: assumes df is your DataFrame
numeric_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, ( DoubleType, FloatType))]

# Create lagged features for each numerical column
for col_name in numeric_cols:
    df = df.withColumn(f"{col_name}_prev_day", lag(col(col_name)).over(windowSpec))

# Remove rows with nulls that result from the lagging operation (e.g., the first row for each location)
# We use a general condition to remove any row where any lagged feature is null
df = df.dropna(subset=[f"{col}_prev_day" for col in numeric_cols])

# Show the result to verify
df.show(5, truncate=False)

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+-------------------------------

In [None]:
# Convert 'date' to a numeric feature: days since the start of the dataset
reference_date = df.select(min(col("date"))).collect()[0][0]
df = df.withColumn("days_since_start", datediff(col("date"), lit(reference_date)))

# Handle 'location' via String Indexing and One-Hot Encoding
indexer = StringIndexer(inputCol="location", outputCol="location_index")
df = indexer.fit(df).transform(df)
encoder = OneHotEncoder(inputCols=["location_index"], outputCols=["location_encoded"])
df = encoder.fit(df).transform(df)

# Selecting the features and the label for the model
feature_cols = [f"{col}_prev_day" for col in numeric_cols]  # plus any additional features you find relevant
all_features = feature_cols + ["days_since_start", "location_encoded"]
df = df.select(*all_features, "new_cases")

train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

**Logistic Regression**

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler


# Assemble features
assembler = VectorAssembler(inputCols=all_features, outputCol="features")
train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)

# Define the regression model
lr = LinearRegression(featuresCol='features', labelCol='new_cases')

# Fit the model
lr_model = lr.fit(train_df)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

training_summary = lr_model.summary
print("Training RMSE:", training_summary.rootMeanSquaredError)
print("Training R-squared:", training_summary.r2)

# Evaluate the model on testing data
test_results = lr_model.evaluate(test_df)
print("Testing RMSE:", test_results.rootMeanSquaredError)
print("Testing R-squared:", test_results.r2)

Training RMSE: 218867.3111702118
Training R-squared: 0.1482920717766033
Testing RMSE: 220571.7542828557
Testing R-squared: 0.06406907906644987


**Decision Tree Regression**

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Train a Decision Tree model
dt = DecisionTreeRegressor(featuresCol="features", labelCol="new_cases")
dt_model = dt.fit(train_df)

# Predictions
train_predictions = dt_model.transform(train_df)
test_predictions = dt_model.transform(test_df)

# Evaluation metrics
evaluator_rmse = RegressionEvaluator(labelCol="new_cases", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="new_cases", predictionCol="prediction", metricName="r2")

    # Compute metrics
train_rmse = evaluator_rmse.evaluate(train_predictions)
test_rmse = evaluator_rmse.evaluate(test_predictions)
train_r2 = evaluator_r2.evaluate(train_predictions)
test_r2 = evaluator_r2.evaluate(test_predictions)

    # Print results in specified format
print(f"Training RMSE: {train_rmse:.2f}")
print(f"Training R-squared: {train_r2:.5f}")
print(f"Testing RMSE: {test_rmse:.2f}")
print(f"Testing R-squared: {test_r2:.5f}")
print("\n")  # Adds a newline for better readability between model outputs

Training RMSE: 216504.41
Training R-squared: 0.16658
Testing RMSE: 256215.21
Testing R-squared: -0.26286




**Random Forest With a 2 days mean**

In [None]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, avg
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator


# Assuming you have already loaded your DataFrame `covid_df_saved`
# Define the window specification for moving average
windowSpec = Window.partitionBy("location").orderBy("date").rowsBetween(-2, 0)  # 2 days back and the current day

# Example features to compute the moving average
features_to_average = ["total_cases", "new_cases", "total_deaths", "new_deaths"]

## Start with the original DataFrame
covid_df_saved2 = covid_df_saved

# Add moving average columns to the DataFrame
for feature in features_to_average:
    covid_df_saved2 = covid_df_saved2.withColumn(f"{feature}_3day_avg", avg(col(feature)).over(windowSpec))

# Drop rows with null values in the newly created average columns
covid_df_saved2 = covid_df_saved2.dropna(subset=[f"{feature}_3day_avg" for feature in features_to_average])

# Select the necessary columns and prepare the final DataFrame for modeling
final_data = covid_df_saved2.select(
    col("new_cases").alias("label"),
    *[f"{feature}_3day_avg" for feature in features_to_average]
)

# Assemble the features into a single feature vector
assembler = VectorAssembler(inputCols=[f"{feature}_3day_avg" for feature in features_to_average], outputCol="features")

# Define the Random Forest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="label", numTrees=20)

# Build the pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Split the data
train_data, test_data = final_data.randomSplit([0.7, 0.3], seed=42)

# Train the model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the predictions
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

# Calculate RMSE and R-squared
rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

# Print the evaluation results
print(f"Root Mean Squared Error (RMSE) on test data = {rmse:.2f}")
print(f"R-squared on test data = {r2:.5f}")

Root Mean Squared Error (RMSE) on test data = 202786.86
R-squared on test data = 0.08942


**Predicting the New Cases Number Considering the Same-Day Data**

**Linear Regression**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, datediff, lit, min
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline


# Assuming 'new_cases' is the label for the model
covid_dff = covid_df_saved.withColumn("label", col("new_cases"))

# Ensure all features are used including the one-hot encoded and numerical features
numerical_features = [col_name for col_name, dtype in covid_dff.dtypes if dtype == 'double']


# Assemble features into a vector
assembler = VectorAssembler(inputCols=numerical_features, outputCol="features")
feature_vector_df = assembler.transform(covid_dff)

# Split data into training and testing sets
train_data, test_data = feature_vector_df.randomSplit([0.7, 0.3], seed=42)

# Define the regression model
lr = LinearRegression(featuresCol='features', labelCol='new_cases')

# Fit the model
lr_model = lr.fit(train_data)

training_summary = lr_model.summary
print("Training RMSE:", training_summary.rootMeanSquaredError)
print("Training R-squared:", training_summary.r2)

# Evaluate the model on testing data
test_results = lr_model.evaluate(test_data)
print("Testing RMSE:", test_results.rootMeanSquaredError)
print("Testing R-squared:", test_results.r2)

Training RMSE: 38.33829902314353
Training R-squared: 0.9999999694215495
Testing RMSE: 37.89490894085752
Testing R-squared: 0.999999980152723


**Random Forest**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType, FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator


# Automatically identify numeric columns for averaging, excluding the target column 'new_cases'
numeric_features = [f.name for f in covid_df_saved.schema.fields if isinstance(f.dataType, (DoubleType, FloatType)) and f.name != 'new_cases']

# Select the necessary columns and prepare the final DataFrame for modeling
final_data = covid_df_saved.select(
    col("new_cases").alias("label"),
    *numeric_features
)

# Assemble the features into a single feature vector
assembler = VectorAssembler(inputCols=numeric_features, outputCol="features")

# Define the Random Forest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="label", numTrees=20)

# Build the pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Split the data
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

# Train the model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the predictions
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

# Calculate RMSE and R-squared
rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

# Print the evaluation results
print(f"Root Mean Squared Error (RMSE) on test data = {rmse:.2f}")
print(f"R-squared on test data = {r2:.5f}")

Root Mean Squared Error (RMSE) on test data = 82206.02
R-squared on test data = 0.78238


**Decision Tree Regression**

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Train a Decision Tree model
covid_dff = DecisionTreeRegressor(featuresCol="features", labelCol="new_cases", maxBins=256)
dt_model = covid_dff.fit(train_data)

# Predictions
train_predictions = dt_model.transform(train_data)
test_predictions = dt_model.transform(test_data)

# Evaluation metrics
evaluator_rmse = RegressionEvaluator(labelCol="new_cases", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="new_cases", predictionCol="prediction", metricName="r2")

    # Compute metrics
train_rmse = evaluator_rmse.evaluate(train_predictions)
test_rmse = evaluator_rmse.evaluate(test_predictions)
train_r2 = evaluator_r2.evaluate(train_predictions)
test_r2 = evaluator_r2.evaluate(test_predictions)

    # Print results in specified format
print(f"Training RMSE: {train_rmse:.2f}")
print(f"Training R-squared: {train_r2:.5f}")
print(f"Testing RMSE: {test_rmse:.2f}")
print(f"Testing R-squared: {test_r2:.5f}")
print("\n")  # Adds a newline for better readability between model outputs

Training RMSE: 38843.44
Training R-squared: 0.96861
Testing RMSE: 145966.00
Testing R-squared: 0.70553




**Gradient-Boosted**

In [None]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# List of different maxIter values to test
maxIter_values = [5, 20, 50]

# Initialize the evaluator
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

# Dictionary to store R-squared values for each maxIter
r2_scores = {}

for maxIter in maxIter_values:
    # Define the Gradient-Boosted Trees regressor with current maxIter
    gbt = GBTRegressor(featuresCol="features", labelCol="label", maxIter=maxIter, maxBins=255)

    # Train the model
    model = gbt.fit(train_data)

    # Make predictions
    predictions = model.transform(test_data)

    # Calculate R-squared
    r2 = evaluator.evaluate(predictions)

    # Store the R-squared value
    r2_scores[maxIter] = r2

    # Print the results for current maxIter
    print(f"maxIter = {maxIter}: R-squared = {r2:.5f}")

# Find the maxIter value with the highest R-squared
best_maxIter = max(r2_scores, key=r2_scores.get)
print(f"Best maxIter: {best_maxIter} with R-squared: {r2_scores[best_maxIter]:.5f}")


maxIter = 5: R-squared = 0.70591


In [None]:
covid_df

NameError: name 'covid_df' is not defined

In [None]:
!pip install SQLAlchemy



# SQL Query| Top 5 countries with the highest max. new Covid-19 cases


In [None]:

from pyspark.sql.functions import sum as spark_sum, col, year, max, desc
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Grouping by location and year, and calculating maximum new cases per million for each year
max_new_cases_per_year_df = covid_df.groupBy("location", year("date").alias("year")) \
    .agg(
        max("new_cases_per_million").alias("max_new_cases_per_million"),
        (spark_sum("population") / 1000).alias("total_population_in_millions")  # Adjust total population
    )

# Ordering by year and max_new_cases_per_million in descending order
max_new_cases_per_year_df = max_new_cases_per_year_df.orderBy("year", desc("max_new_cases_per_million"))

# Adding a column for maximum new cases as a percentage of the respective population
max_new_cases_per_year_df = max_new_cases_per_year_df.withColumn(
    "max_new_cases_percentage",
    (col("max_new_cases_per_million") / col("total_population_in_millions")) * 100  # Calculate percentage
)

# Window function to rank countries within each year
windowSpec = Window.partitionBy("year").orderBy(desc("max_new_cases_per_million"))

# Adding rank to each row within each year
ranked_df = max_new_cases_per_year_df.withColumn("rank", rank().over(windowSpec))

# Filtering to select the top country for each year
top_countries_top_peaks_per_year = ranked_df.filter(ranked_df["rank"] == 1)

# Presenting the result
top_countries_top_peaks_per_year.show()



Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=6059d4cf5c20fa3de13826f17689911bebb37deb2d6f2a14ffecc62a0e30f2af
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


NameError: name 'covid_df' is not defined

*Data result explanation example:*

-In 2020, Vatican had the highest maximum new cases per million with 17,326.733.



# **SQL Query| Total number of deaths from COVID-19 per continent and year**




In [None]:
# Filtering out rows with empty continents and countries
filtered_df = covid_df.filter(col("continent").isNotNull() & col("location").isNotNull())

# Extracting the year from the date column
df_with_year = filtered_df.withColumn("year", year(col("date")))

# Grouppinng by continent and year, and calculate the total deaths
total_deaths_per_year_continent = df_with_year.groupBy("continent", "year") \
    .agg(sum(when(col("total_deaths").isNull(), 0).otherwise(col("total_deaths"))).alias("total_deaths"))

# Presenting the final result
total_deaths_per_year_continent.orderBy("continent", "year").show()



NameError: name 'covid_df' is not defined

*Data result explanation*


-For example, in Africa continent, the total number of deaths from COVID-19 has been increasing over the years. In 2020, there were 7,179,617.0 deaths, which increased significantly in 2021,2022,2023 million deaths, before decreasing in 2024.

# **SQL Query| Case Fatility Rate: 20 cases showing average CFR**
(medicine indicator)

NOTE:Proportion of people who have been diagnosed with a certain disease and end up dying of it. Is not the same as disease's mortality rate: The CFR does not take into account the time period between disease onset and death.

In [None]:
# Calculate CFR: Total Deaths / Total Cases
covid_dff_with_cfr = covid_dff.withColumn("case_fatality_rate",
                                         F.when((F.col("total_cases") > 0) & (F.col("total_deaths").isNotNull()),
                                                F.col("total_deaths") / F.col("total_cases")).otherwise(None))

# Extract month and year from the date column
covid_dff_with_month_year = covid_dff_with_cfr.withColumn("month", F.month("date")).withColumn("year", F.year("date"))

# Group by continent, country, month, and year, and calculate average CFR
cfr_per_continent_country_month = covid_dff_with_month_year.filter((F.col("continent").isNotNull()) & (F.col("location").isNotNull())) \
                                                          .groupBy("continent", "location", "month", "year") \
                                                          .agg(F.avg("case_fatality_rate").alias("avg_cfr"))

# Exclude rows where avg_cfr is NULL
cfr_per_continent_country_month = cfr_per_continent_country_month.filter(F.col("avg_cfr").isNotNull())

# Show the result
cfr_per_continent_country_month.orderBy("continent", "location", "year", "month").show()

+---------+--------+-----+----+--------------------+
|continent|location|month|year|             avg_cfr|
+---------+--------+-----+----+--------------------+
|   Africa| Algeria|    3|2020|0.006394770498792098|
|   Africa| Algeria|    4|2020| 0.12352535358688674|
|   Africa| Algeria|    5|2020| 0.08863978500611278|
|   Africa| Algeria|    6|2020| 0.06949526179339287|
|   Africa| Algeria|    7|2020|0.053109164547303404|
|   Africa| Algeria|    8|2020|0.036479786795351146|
|   Africa| Algeria|    9|2020|  0.0335896156829439|
|   Africa| Algeria|   10|2020| 0.03392106309816489|
|   Africa| Algeria|   11|2020|0.032224164623873765|
|   Africa| Algeria|   12|2020|0.028395185512718062|
|   Africa| Algeria|    1|2021| 0.02741839473601339|
|   Africa| Algeria|    2|2021| 0.02666078215206401|
|   Africa| Algeria|    3|2021|0.026355111852053612|
|   Africa| Algeria|    4|2021|0.026408668457806414|
|   Africa| Algeria|    5|2021| 0.02681760988247228|
|   Africa| Algeria|    6|2021|0.0268015866606

*Data result explanation*

For example, in the Algeria, in 1st line, on 03.2020, the case fatility rate was 0.00663 aproximatelly.

# **SQL Query:Top 20 Lowest Test & Vaccination Metrics**


In [None]:


# Calculating test metrics
test_metrics = covid_dff.groupBy("continent", F.year("date").alias("year"), F.month("date").alias("month")) \
    .agg(F.sum("total_tests").alias("total_tests"),
         F.sum("total_cases").alias("total_cases"),
         F.avg("positive_rate").alias("avg_positive_rate"))

# Calculating vaccination metrics
vaccination_metrics = covid_dff.groupBy("continent", F.year("date").alias("year"), F.month("date").alias("month")) \
    .agg(F.sum("total_vaccinations").alias("total_vaccinations"),
         F.avg("total_vaccinations_per_hundred").alias("avg_vaccinations_per_hundred"))

# Joining test_metrics and vaccination_metrics
combined_metrics = test_metrics.join(vaccination_metrics,
                                     ["continent", "year", "month"],
                                     "inner") \
                               .orderBy("continent", "year", "month") \
                               .withColumn("test_positivity_status",
                                           F.when(F.col("avg_positive_rate") < 0.05, "Low Positivity")
                                           .when((F.col("avg_positive_rate") >= 0.05) & (F.col("avg_positive_rate") < 0.1), "Medium Positivity")
                                           .otherwise("High Positivity")) \
                               .withColumn("vaccination_status",
                                           F.when(F.col("avg_vaccinations_per_hundred") < 10, "Low Vaccination")
                                           .when((F.col("avg_vaccinations_per_hundred") >= 10) & (F.col("avg_vaccinations_per_hundred") < 30), "Medium Vaccination")
                                           .otherwise("High Vaccination")) \
                               .select("continent", "year", "month", "total_tests", "total_cases", "total_vaccinations",
                                       "avg_positive_rate", "avg_vaccinations_per_hundred", "test_positivity_status", "vaccination_status")

# Presenting the result
combined_metrics.show()

+---------+----+-----+-------------+------------+------------------+--------------------+----------------------------+----------------------+------------------+
|continent|year|month|  total_tests| total_cases|total_vaccinations|   avg_positive_rate|avg_vaccinations_per_hundred|test_positivity_status|vaccination_status|
+---------+----+-----+-------------+------------+------------------+--------------------+----------------------------+----------------------+------------------+
|   Africa|2020|    1|          0.0|         0.0|               0.0|                 0.0|                         0.0|        Low Positivity|   Low Vaccination|
|   Africa|2020|    2|        825.0|        14.0|               0.0|                 0.0|                         0.0|        Low Positivity|   Low Vaccination|
|   Africa|2020|    3|     347590.0|     25836.0|               0.0|0.007188285229202032|                         0.0|        Low Positivity|   Low Vaccination|
|   Africa|2020|    4|    6144070.

**Data result explanation**

-total-tests: TShows the cumulative number of COVID-19 tests conducted in Africa during the specified month and year.
For example, in January 2020, no tests were recorded (0.0), but as time progressed, the number of tests increased substantially.



-total_cases: Shows the total number of confirmed COVID-19 cases reported in Africa for the specified month and year.
It's show cases that the number of cases increased over time, reflecting the progression of the pandemic.

-total_vaccinations: Shows the total number of COVID-19 vaccinations administered in Africa during the specified month and year.
 Initially, there were no vaccinations (0.0), but as vaccination efforts began, the number of administered doses increased.

-average_positive_rate: Shows the average positivity rate of COVID-19 tests.
For example, a positive rate of 0.0104 in May 2021 indicates that about 1,04% of COVID-19 tests conducted in Africa during that month were positive, so low positivity.

-average_vaccinations_per_hundred: Shows the average number of COVID-19 vaccinations administered per hundred people in Africa for the specified month and year.Since no vacines until 12.2020 in Africa, average in null, for example.


-test_positivity_status:Shows positivity rate into "Low Positivity," "Medium Positivity," or "High Positivity" based on predefined thresholds.
For instance, a positivity rate below 0.05 is classified as "Low Positivity", saw until 06.2021 in Africa.

-vaccination_status: This categorizes the average vaccinations per hundred people into "Low Vaccination," "Medium Vaccination," or "High Vaccination" based on predefined thresholds.
For example, if the average vaccinations per hundred people are below 10, it's categorized as "Low Vaccination." Saw still in Africa in 2020.

# **SQL Query|20 cases with Mortality Metrics: Hospitalizations,ICU patients, Deaths/million**




In [None]:
# Calculate mortality metrics
mortality_metrics = covid_dff.groupBy(F.year("date").alias("year"), F.month("date").alias("month")) \
    .agg(F.sum("excess_mortality").alias("total_excess_mortality"),
         F.avg("excess_mortality_cumulative_per_million").alias("avg_excess_mortality_per_million"))

# Calculate hospitalization metrics
hospitalization_metrics = covid_dff.groupBy(F.year("date").alias("year"), F.month("date").alias("month")) \
    .agg(F.avg("icu_patients").alias("avg_icu_patients"),
         F.avg("hosp_patients").alias("avg_hosp_patients"))

# Join mortality_metrics and hospitalization_metrics
combined_metrics = mortality_metrics.join(hospitalization_metrics,
                                          ["year", "month"],
                                          "inner") \
                                    .orderBy("year", "month")

# Show the result
combined_metrics.show()

+----+-----+----------------------+--------------------------------+--------------------+--------------------+
|year|month|total_excess_mortality|avg_excess_mortality_per_million|    avg_icu_patients|   avg_hosp_patients|
+----+-----+----------------------+--------------------------------+--------------------+--------------------+
|2020|    1|   -1255.1599999999999|             -1.2084358160317938|                 0.0|2.999400119976004...|
|2020|    2|    -771.3699999999998|             -2.2118927190239153|0.044771968854282536| 0.23164627363737486|
|2020|    3|     518.7499999999999|             -3.4233252393416813|  16.365560862269266|   100.7710081321802|
|2020|    4|     3575.909999999999|             -0.5181508891375662|   80.79801587301587|   456.7612433862434|
|2020|    5|     2688.700000000001|               1.567511376779314|  37.568612391193035|  267.42345110087047|
|2020|    6|               2628.51|              2.7150970301984128|  20.344576719576718|  134.59259259259258|
|

*Data Result Explanation:*

This table presents mortality and hospitalization metrics aggregated by year and month:

Month: Is from 1 (january) to 12 (december)
-total_excess_mortality: Shows the number of deaths observed above what would be expected under normal circumstances. A negative value indicates that the observed mortality was lower than expected, while a positive value indicates excess mortality.For example, in 01.2020, the total excess mortality was approximately -1255.16, indicating that there was less mortality observed compared to what was predicted.


-avg_excess_mortality_per_million: Shows the average excess mortality per million people for each month.For example, in 01.2020, the average excess mortality per million people was approximately -1.20.

-avg_icu_patients: Shows the average number of patients in intensive care units (ICUs) for each month.For example, in 03.2020, the average number of ICU patients was approximately 16.36.

-avg_hospitalized_patients: Shows the average number of hospitalized patients for each month.For example, in 03.2020, the average number of hospitalized patients was approximately 100.71.

# **SQL Query|Top 20 Lowest Test & Vaccination Metrics (for Germany only)**

NOTE: This query is a modification of the "Top 20 Lowest Test & Vaccination Metrics" contempling only German case. The explations are the same provided in the case related.

In [None]:
# Filtering data for Germany
germany_data = covid_dff.filter(covid_dff.location == "Germany")

# Calculating test metrics
germany_test_metrics = germany_data.groupBy( F.month("date").alias("month"), F.year("date").alias("year")) \
    .agg(F.sum("total_tests").alias("total_tests"),
         F.sum("total_cases").alias("total_cases"),
         F.avg("positive_rate").alias("avg_positive_rate"))

# Calculating vaccination metrics
germany_vaccination_metrics = germany_data.groupBy(F.year("date").alias("year"), F.month("date").alias("month")) \
    .agg(F.sum("total_vaccinations").alias("total_vaccinations"),
         F.avg("total_vaccinations_per_hundred").alias("avg_vaccinations_per_hundred"))

# Joining test_metrics and vaccination_metrics
germany_combined_metrics = germany_test_metrics.join(germany_vaccination_metrics,
                                     ["year", "month"],
                                     "inner") \
                               .orderBy("year", "month") \
                               .withColumn("test_positivity_status",
                                           F.when(F.col("avg_positive_rate") < 0.05, "Low Positivity")
                                           .when((F.col("avg_positive_rate") >= 0.05) & (F.col("avg_positive_rate") < 0.1), "Medium Positivity")
                                           .otherwise("High Positivity")) \
                               .withColumn("vaccination_status",
                                           F.when(F.col("avg_vaccinations_per_hundred") < 10, "Low Vaccination")
                                           .when((F.col("avg_vaccinations_per_hundred") >= 10) & (F.col("avg_vaccinations_per_hundred") < 30), "Medium Vaccination")
                                           .otherwise("High Vaccination")) \
                               .select("year", "month", "total_tests", "total_cases", "total_vaccinations",
                                       "avg_positive_rate", "avg_vaccinations_per_hundred", "test_positivity_status", "vaccination_status")

# Presenting the result
germany_combined_metrics.show()

+----+-----+------------+------------+------------------+--------------------+----------------------------+----------------------+------------------+
|year|month| total_tests| total_cases|total_vaccinations|   avg_positive_rate|avg_vaccinations_per_hundred|test_positivity_status|vaccination_status|
+----+-----+------------+------------+------------------+--------------------+----------------------------+----------------------+------------------+
|2020|    1|         0.0|        33.0|               0.0|                 0.0|                         0.0|        Low Positivity|   Low Vaccination|
|2020|    2|         0.0|       611.0|               0.0|                 0.0|                         0.0|        Low Positivity|   Low Vaccination|
|2020|    3|   1514540.0|    462500.0|               0.0|0.006935483870967742|                         0.0|        Low Positivity|   Low Vaccination|
|2020|    4|   7390987.0|   3640512.0|               0.0|0.009566666666666666|                      

# **SQL Query|20 cases with Mortality Metrics: Hospitalizations,ICU patients, Deaths/million(for Germany only)**

NOTE: This query is a modification of the "20 cases with Mortality Metrics: Hospitalizations,ICU patients, Deaths/million" contempling only German case. The explations are the same provided in the case related.

In [None]:
# Filter data for Germany
germany_data = covid_dff.filter(covid_dff.location == "Germany")

# Calculate mortality metrics
germany_mortality_metrics = germany_data.groupBy(F.year("date").alias("year"), F.month("date").alias("month")) \
    .agg(F.sum("excess_mortality").alias("total_excess_mortality"),
         F.avg("excess_mortality_cumulative_per_million").alias("avg_excess_mortality_per_million"))

# Calculate hospitalization metrics
germany_hospitalization_metrics = germany_data.groupBy(F.year("date").alias("year"), F.month("date").alias("month")) \
    .agg(F.avg("icu_patients").alias("avg_icu_patients"),
         F.avg("hosp_patients").alias("avg_hosp_patients"))

# Join mortality_metrics and hospitalization_metrics
germany_combined_metrics = germany_mortality_metrics.join(germany_hospitalization_metrics,
                                          ["year", "month"],
                                          "inner") \
                                    .orderBy("year", "month")

# Show the result
germany_combined_metrics.show()

+----+-----+----------------------+--------------------------------+------------------+-----------------+
|year|month|total_excess_mortality|avg_excess_mortality_per_million|  avg_icu_patients|avg_hosp_patients|
+----+-----+----------------------+--------------------------------+------------------+-----------------+
|2020|    1|                -18.42|             -3.5928984444444443|               0.0|              0.0|
|2020|    2|                -35.57|             -12.831649137931036|               0.0|              0.0|
|2020|    3|   -28.369999999999997|             -32.208632903225805|299.19354838709677|              0.0|
|2020|    4|    31.799999999999997|              -21.37351266666667|2597.4666666666667|              0.0|
|2020|    5|   0.43999999999999995|             -21.940237741935483|1297.6774193548388|              0.0|
|2020|    6|                  2.86|                      -17.985495| 453.3333333333333|              0.0|
|2020|    7|                -17.23|           