In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer

spark = SparkSession.builder.appName("Covid19_confirmed cases_predicting_model").getOrCreate()


Reading data from datalake

In [2]:

# Load the CSV file into a DataFrame
df1 = spark.read.csv("hdfs://localhost:9000/datalake/africa_geopoll_coronavirus_r2_weighteddata_5_18.csv", header=True, inferSchema=True)
df2 = spark.read.csv("hdfs://localhost:9000/datalake/africa_geopoll_round2.csv", header=True, inferSchema=True)

Data Preprocessing with PySpark

In [3]:
# Get the column names for each DataFrame
columns_df1 = set(df1.columns)
columns_df2 = set(df2.columns)

# Find columns differences
columns_only_in_df1 = columns_df1 - columns_df2
columns_only_in_df2 = columns_df2 - columns_df1

# Find columns present in both DataFrames
common_columns = columns_df1 & columns_df2

print("Columns present in df1 but not in df2:", columns_only_in_df1)
print("Columns present in df2 but not in df1:", columns_only_in_df2)
print("Columns present in both:", common_columns)

Columns present in df1 but not in df2: {'PreventativeMeasures_Avoiding public transport', 'CommercialTrust_Other', 'Groups_Brands', 'InitiativeTaken_None', 'PreventativeMeasures_Other', 'Groups_Governments', 'CommercialTrust_Telecommuncations', 'Groups_Private sector', 'CommercialTrust_Brands', 'SurveyId', 'InformationSources_Radio', 'Groups_NGOs', 'InformationSources_Government messages', 'PreventativeMeasures_Increasing hygiene', 'Groups_Retailers', 'Groups_All of the above', 'Cut', 'CommercialTrust_Retailers', 'InitiativeTaken_Limited store traffic', 'InformationSources_Social Media', 'Batch', 'InitiativeTaken_Sanitiser offered', 'CommercialTrust_Banks', 'InformationSources_TV', 'Mobile Number', 'PreventativeMeasures_Avoiding public places', 'InformationSources_Newspapers', 'PreventativeMeasures_Working from home', 'InformationSources_Other', 'InitiativeTaken_Sectioned intercepts', 'InitiativeTaken_Staff in protective gear', 'InformationSources_Friends/family', 'Admin2', 'UserId'}
C

Create a new DataFrame with only the common columns

In [4]:

common_columns_df1 = df1.select(*common_columns)
common_columns_df2 = df1.select(*common_columns)


# Append the two DataFrames
appended_common_columns_df = common_columns_df1.union(common_columns_df2)


appended_common_columns_df.show()


+-----------------+---------+---+--------------+------------+--------------+--------------------+------+-----------+--------------------+-------------+--------------------+-----------------+------------------+------------+---------+--------------------+--------------+---------------+----------------+-------------+-------------+---------------+---+--------------------+---------+--------------------+-----------------+----------------+---------+
|NonEssentialItems|Awareness|SEC|FoodLocations2|LevelConcern|HealthBehavior|       BrandPurchase|Gender|Urban/Rural|          FoodAmount|RiskAwareness|            Concerns|MarketOperability|            Admin1|StayPositive|Foodworry|        FoodShopping|EconomicImpact|GovernmentTrust|MediaConsumption|FoodLocations|  SocialMedia|VirusPrevention|Age|              Tested|BirthYear|             Country|      HandWashing|SocialDistancing|Age Group|
+-----------------+---------+---+--------------+------------+--------------+--------------------+------+--

Remove columns with empty values

In [5]:
def cleaning_data(df):
    # Create a list of columns with missing values
    cols_with_missing = [col_name for col_name in df.columns if df.filter(col(col_name).isNull()).count() > 0]

    # Drop columns with missing values
    df_cleaned = df.drop(*cols_with_missing)

    return df_cleaned

# Dropping a specific column
appended_common_columns_df = appended_common_columns_df.drop("Admin1")

df_cleaned = cleaning_data(appended_common_columns_df)


In [6]:
num_columns_old = len(appended_common_columns_df.columns)
print("No of columns before: ", num_columns_old)

num_columns_new = len(df_cleaned.columns)
print("No of columns after: ", num_columns_new)

No of columns before:  29
No of columns after:  27


Modifying the test column to show only relevant values

i.e. Only those tested 

In [7]:
# Count occurrences of each value in the "Tested" column
test_counts = df_cleaned.groupBy("Tested").count()

# Display the results
print("Tested Column Value Counts:")
test_counts.show()

Tested Column Value Counts:
+--------------------+-----+
|              Tested|count|
+--------------------+-----+
|Yes - tested posi...|  970|
|                  No| 5326|
|Yes - tested nega...| 1418|
|            Not sure| 1086|
+--------------------+-----+



In [8]:
desired_values = ["Yes - tested positive", "Yes - tested negative"]

# Filter the DataFrame to include only rows with the specified values in the "Tested" column
filtered_df = df_cleaned.filter(col("Tested").isin(desired_values))

# Count occurrences of each value in the "Tested" column
test_counts = filtered_df.groupBy("Tested").count()

# Display the results
print("Tested Column Value Counts:")
test_counts.show()

Tested Column Value Counts:
+--------------------+-----+
|              Tested|count|
+--------------------+-----+
|Yes - tested posi...|  970|
|Yes - tested nega...| 1418|
+--------------------+-----+



Getting the column types

In [9]:
# Get the data types of the columns
column_data_types = filtered_df.dtypes

# Display the results
print("Data Types of Columns:")
for col_name, col_type in column_data_types:
    print(f"{col_name}: {col_type}")

Data Types of Columns:
NonEssentialItems: string
Awareness: string
FoodLocations2: string
LevelConcern: string
HealthBehavior: string
BrandPurchase: string
Gender: string
Urban/Rural: string
FoodAmount: string
RiskAwareness: string
Concerns: string
MarketOperability: string
Foodworry: string
FoodShopping: string
EconomicImpact: string
GovernmentTrust: string
MediaConsumption: string
FoodLocations: string
SocialMedia: string
VirusPrevention: string
Age: string
Tested: string
BirthYear: string
Country: string
HandWashing: string
SocialDistancing: string
Age Group: string


Encode string columns to double values

In [10]:
# List of string column names to encode
string_columns_to_encode = ["Gender", "VirusPrevention", "MarketOperability", "LevelConcern", "Concerns", "EconomicImpact", "Awareness", "Age Group", "Foodworry", "Tested", "NonEssentialItems", "RiskAwareness", "Urban/Rural", "SocialMedia", "FoodLocations2", "HandWashing", "FoodShopping", "MediaConsumption", "FoodLocations", "SocialDistancing", "Country", "HealthBehavior", "FoodAmount", "BrandPurchase"]

# Create a Pipeline to apply StringIndexer to each string column
indexers = [StringIndexer(inputCol=col_name, outputCol=col_name+"_index", handleInvalid="keep") for col_name in string_columns_to_encode]
pipeline = Pipeline(stages=indexers)
df_encoded = pipeline.fit(filtered_df).transform(filtered_df)


# Drop the original string columns as the df_encoded above contains the original columns + the indexed ones
df_encoded = df_encoded.drop(*string_columns_to_encode)

# Show the DataFrame with encoded values
df_encoded.show()

+---------------+---+---------+------------+---------------------+-----------------------+------------------+--------------+--------------------+---------------+---------------+---------------+------------+-----------------------+-------------------+-----------------+-----------------+--------------------+-----------------+------------------+----------------------+-------------------+----------------------+-------------+--------------------+----------------+-------------------+
|GovernmentTrust|Age|BirthYear|Gender_index|VirusPrevention_index|MarketOperability_index|LevelConcern_index|Concerns_index|EconomicImpact_index|Awareness_index|Age Group_index|Foodworry_index|Tested_index|NonEssentialItems_index|RiskAwareness_index|Urban/Rural_index|SocialMedia_index|FoodLocations2_index|HandWashing_index|FoodShopping_index|MediaConsumption_index|FoodLocations_index|SocialDistancing_index|Country_index|HealthBehavior_index|FoodAmount_index|BrandPurchase_index|
+---------------+---+---------+---

In [11]:
# Get the data types of the columns
column_data_types = df_encoded.dtypes

# Display the results
print("Data Types of Columns:")
for col_name, col_type in column_data_types:
    print(f"{col_name}: {col_type}")

Data Types of Columns:
GovernmentTrust: string
Age: string
BirthYear: string
Gender_index: double
VirusPrevention_index: double
MarketOperability_index: double
LevelConcern_index: double
Concerns_index: double
EconomicImpact_index: double
Awareness_index: double
Age Group_index: double
Foodworry_index: double
Tested_index: double
NonEssentialItems_index: double
RiskAwareness_index: double
Urban/Rural_index: double
SocialMedia_index: double
FoodLocations2_index: double
HandWashing_index: double
FoodShopping_index: double
MediaConsumption_index: double
FoodLocations_index: double
SocialDistancing_index: double
Country_index: double
HealthBehavior_index: double
FoodAmount_index: double
BrandPurchase_index: double


Type casting string values saved as double

In [12]:
df000 = df_encoded.withColumn("Age", col("Age").cast("double"))
df001 = df000.withColumn("BirthYear", col("BirthYear").cast("double"))
df = df001.withColumn("GovernmentTrust", col("GovernmentTrust").cast("double"))

# Get the data types of the columns
column_data_types = df.dtypes
print("Data Types of Columns After Casting to double:")
for col_name, col_type in column_data_types:
    print(f"{col_name}: {col_type}")

Data Types of Columns After Casting to double:
GovernmentTrust: double
Age: double
BirthYear: double
Gender_index: double
VirusPrevention_index: double
MarketOperability_index: double
LevelConcern_index: double
Concerns_index: double
EconomicImpact_index: double
Awareness_index: double
Age Group_index: double
Foodworry_index: double
Tested_index: double
NonEssentialItems_index: double
RiskAwareness_index: double
Urban/Rural_index: double
SocialMedia_index: double
FoodLocations2_index: double
HandWashing_index: double
FoodShopping_index: double
MediaConsumption_index: double
FoodLocations_index: double
SocialDistancing_index: double
Country_index: double
HealthBehavior_index: double
FoodAmount_index: double
BrandPurchase_index: double


Selecting the column to be predicted

In [13]:
# Define the new order of columns
new_order = ["GovernmentTrust", "Age", "BirthYear", "Gender_index", "VirusPrevention_index", 
             "MarketOperability_index", "LevelConcern_index", "Concerns_index", "EconomicImpact_index",
             "Awareness_index", "Age Group_index", "Foodworry_index", 
             "NonEssentialItems_index", "RiskAwareness_index", "Urban/Rural_index", 
             "SocialMedia_index", "FoodLocations2_index", "HandWashing_index", 
             "FoodShopping_index", "MediaConsumption_index", "FoodLocations_index", 
             "SocialDistancing_index", "Country_index", "HealthBehavior_index", 
             "FoodAmount_index", "BrandPurchase_index",  "Tested_index"]

df_new_order = df.select([col(column_name) for column_name in new_order])

df_new_order.show()

+---------------+----+---------+------------+---------------------+-----------------------+------------------+--------------+--------------------+---------------+---------------+---------------+-----------------------+-------------------+-----------------+-----------------+--------------------+-----------------+------------------+----------------------+-------------------+----------------------+-------------+--------------------+----------------+-------------------+------------+
|GovernmentTrust| Age|BirthYear|Gender_index|VirusPrevention_index|MarketOperability_index|LevelConcern_index|Concerns_index|EconomicImpact_index|Awareness_index|Age Group_index|Foodworry_index|NonEssentialItems_index|RiskAwareness_index|Urban/Rural_index|SocialMedia_index|FoodLocations2_index|HandWashing_index|FoodShopping_index|MediaConsumption_index|FoodLocations_index|SocialDistancing_index|Country_index|HealthBehavior_index|FoodAmount_index|BrandPurchase_index|Tested_index|
+---------------+----+---------+

Building prediction model

In [14]:
import numpy as np
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import RandomOverSampler

Splitting data for model


In [15]:
# Convert PySpark DataFrame to Pandas DataFrame
df_used = df_new_order.toPandas()

train, valid, test = np.split(df_used.sample (frac=1), [int(0.6*len(df_used)), int(0.8*len(df_used))])


  return bound(*args, **kwds)


In [16]:
column_names = df_used.columns
predicted_column = column_names[-1]

print("Predicted Column:", predicted_column)

Predicted Column: Tested_index


In [17]:
def scale_dataset(dataframe , oversample = False ):
    X = dataframe[dataframe.columns[:-1]].values
    y = dataframe[dataframe.columns[-1]].values
    
    scaler = StandardScaler()

    X = scaler.fit_transform(X)
    
    if oversample:
        ros = RandomOverSampler()
        X , y = ros.fit_resample(X , y)
    
    data = np.hstack(( X, np.reshape(y , (-1 , 1)) ))

    return data, X, y


train, X_train, y_train = scale_dataset(train, oversample = True)
valid, X_valid, y_valid = scale_dataset(valid, oversample = False)
test, X_test, y_test = scale_dataset(test, oversample = False)

Histogram based Gradient Boosting

In [18]:
from sklearn.metrics import classification_report
from sklearn.ensemble import HistGradientBoostingClassifier


HBGmodel = HistGradientBoostingClassifier()
HBGmodel.fit(X_train, y_train)


In [19]:
y_predictions = HBGmodel.predict(X_test)
print(classification_report(y_test, y_predictions))

              precision    recall  f1-score   support

         0.0       0.83      0.86      0.84       276
         1.0       0.79      0.76      0.77       202

    accuracy                           0.81       478
   macro avg       0.81      0.81      0.81       478
weighted avg       0.81      0.81      0.81       478



In [20]:
y_predictions = HBGmodel.predict(X_valid)
print(classification_report(y_valid, y_predictions))

              precision    recall  f1-score   support

         0.0       0.83      0.85      0.84       294
         1.0       0.75      0.72      0.74       184

    accuracy                           0.80       478
   macro avg       0.79      0.79      0.79       478
weighted avg       0.80      0.80      0.80       478



In [21]:
# Stop the Spark session
spark.stop()