In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

23/11/25 01:11:11 WARN Utils: Your hostname, codespaces-a019c6 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
23/11/25 01:11:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/25 01:11:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [18]:
input_records = '../data/records/'
input_loc = '../data/loc/'
df_records = spark.read.parquet(input_records)
df_loc = spark.read.parquet(input_loc)

df_records.createOrReplaceTempView('records')
df_loc.createOrReplaceTempView('loc')

df_result = spark.sql("""
select    SNo, 
          TO_DATE(ObservationDate, 'MM/dd/yyyy') as ObservationDate, 
          r.`Province/State`, 
          r.`Country/Region`,
          CAST(Confirmed as INTEGER) as Confirmed, 
          CAST(Deaths as INTEGER) as Deaths, 
          CAST(Recovered as INTEGER) as Recovered, 
          CAST(Lat as FLOAT) as Lat, 
          CAST(Long as FLOAT) as Long
          from records r left join loc l
          on r.`Province/State` = l.`Province/State`
""")

In [19]:
df_result.createOrReplaceTempView('covid')

In [28]:
data = spark.sql("""
select `Province/State`, max(Confirmed) as FinalConfirmed, max(Deaths) as FinalDeaths, max(Recovered) as FinalRecovered, max(Lat) as Lat, max(Long) as Long
from covid
where ObservationDate='2021-05-29' and `Province/State`!='Unknown'
group by `Province/State`
order by `Province/State`
          """
)

In [29]:
data.printSchema()

root
 |-- Province/State: string (nullable = true)
 |-- FinalConfirmed: integer (nullable = true)
 |-- FinalDeaths: integer (nullable = true)
 |-- FinalRecovered: integer (nullable = true)
 |-- Lat: float (nullable = true)
 |-- Long: float (nullable = true)



In [35]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
assembler = VectorAssembler(inputCols=['Lat', 'Long'], outputCol='features')
df_assembled = assembler.transform(data)
df = df_assembled.select('features', 'FinalConfirmed')
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

In [45]:
lr = LinearRegression(regParam=0.2, labelCol='FinalConfirmed')
pipeline = Pipeline(stages=[assembler, lr])

model = pipeline.fit(train_data)

In [46]:
predictions = model.transform(test_data)
predictions.select('FinalConfirmed', 'prediction', 'features').show()

+--------------+------------------+--------------------+
|FinalConfirmed|        prediction|            features|
+--------------+------------------+--------------------+
|          1059|2968.0020392221277|[40.1823997497558...|
|           194|3199.1471049302363|[35.7518005371093...|
|           616|3340.8171580007233|[35.1916999816894...|
|           253| 3159.183641298928|[37.5777015686035...|
|          1025|3696.4332189245156|[30.6170997619628...|
|           393|3074.5913284561007|[39.3054008483886...|
|             1|3317.4813157284825|[31.6926994323730...|
+--------------+------------------+--------------------+



In [47]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol='FinalConfirmed', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): 2774.173380695434
