In [1]:
import pyspark
import pandas as pd
import numpy as np

In [2]:
# Setting Spark environments

import os

os.environ['SPARK_HOME'] = '/Users/atharvakarnik/spark-3.5.1-bin-hadoop3'
os.environ['PYSPARK_PYTHON'] = 'python'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'

In [29]:
# Creating a literally (not truely though) random dataframe

rows = np.random.randint(10, 13)
data = {
    "Temperature_C": np.random.uniform(20, 35, size=rows).round(2),
    "Humidity": np.random.uniform(30, 90, size=rows).round(2)
}

df = pd.DataFrame(data)

display(df)

Unnamed: 0,Temperature_C,Humidity
0,23.25,32.28
1,20.2,49.0
2,35.0,77.27
3,22.14,48.56
4,28.28,82.33
5,31.67,53.59
6,31.32,60.72
7,23.25,73.61
8,34.58,65.37
9,24.98,41.08


In [30]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('RandomApp').getOrCreate()

In [31]:
spark

In [32]:
spark_df = spark.createDataFrame(df)

In [33]:
spark_df.show()

+-------------+--------+
|Temperature_C|Humidity|
+-------------+--------+
|        23.25|   32.28|
|         20.2|    49.0|
|         35.0|   77.27|
|        22.14|   48.56|
|        28.28|   82.33|
|        31.67|   53.59|
|        31.32|   60.72|
|        23.25|   73.61|
|        34.58|   65.37|
|        24.98|   41.08|
|        25.16|    65.7|
|        24.69|   40.31|
+-------------+--------+



In [34]:
spark_df.printSchema()

root
 |-- Temperature_C: double (nullable = true)
 |-- Humidity: double (nullable = true)



In [35]:
# Like a SELECT statement from SQL, Like filtering in Pandas...
spark_df.select(['Temperature_C']).show()

# Printing summary of Spark dataframe
spark_df.describe().show()

+-------------+
|Temperature_C|
+-------------+
|        23.25|
|         20.2|
|         35.0|
|        22.14|
|        28.28|
|        31.67|
|        31.32|
|        23.25|
|        34.58|
|        24.98|
|        25.16|
|        24.69|
+-------------+

+-------+------------------+------------------+
|summary|     Temperature_C|          Humidity|
+-------+------------------+------------------+
|  count|                12|                12|
|   mean|27.043333333333337| 57.48500000000001|
| stddev| 4.993907803665898|15.883322013414517|
|    min|              20.2|             32.28|
|    max|              35.0|             82.33|
+-------+------------------+------------------+



In [44]:
# Adding temperature in Farenheit
spark_df.withColumn('Temperature_F', spark_df.Temperature_C * 1.8 + 32).show()

# spark_df.select(['Temperature_C', 'Temperature_F']).show()

+-------------+--------+-----------------+
|Temperature_C|Humidity|    Temperature_F|
+-------------+--------+-----------------+
|        23.25|   32.28|            73.85|
|         20.2|    49.0|            68.36|
|         35.0|   77.27|             95.0|
|        22.14|   48.56|           71.852|
|        28.28|   82.33|           82.904|
|        31.67|   53.59|           89.006|
|        31.32|   60.72|           88.376|
|        23.25|   73.61|            73.85|
|        34.58|   65.37|           94.244|
|        24.98|   41.08|           76.964|
|        25.16|    65.7|77.28800000000001|
|        24.69|   40.31|76.44200000000001|
+-------------+--------+-----------------+



In [46]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
import math

def calculate_heat_index(T, H):
    T_F = T * 1.8 + 32
    HI_F = -42.379 + 2.04901523 * T_F + 10.14333127 * H - 0.22475541 * T_F * H \
           - 0.00683783 * T_F**2 - 0.05481717 * H**2 + 0.00122874 * T_F**2 * H \
           + 0.00085282 * T_F * H**2 - 0.00000199 * T_F**2 * H**2
    HI_C = (HI_F - 32) * 5 / 9
    return HI_C

heat_index_udf = udf(calculate_heat_index, DoubleType())

In [47]:
spark_df.withColumn('realFeel_C', heat_index_udf(col('Temperature_C'), col('Humidity'))).show()

+-------------+--------+------------------+
|Temperature_C|Humidity|        realFeel_C|
+-------------+--------+------------------+
|        23.25|   32.28|  24.8900957115826|
|         20.2|    49.0|25.167423271235577|
|         35.0|   77.27|54.761525415134585|
|        22.14|   48.56|24.975717773182613|
|        28.28|   82.33| 33.21652398881414|
|        31.67|   53.59| 34.61404159041657|
|        31.32|   60.72|35.728184870092875|
|        23.25|   73.61| 24.01024547773431|
|        34.58|   65.37|46.391980987860045|
|        24.98|   41.08| 25.70160238238701|
|        25.16|    65.7|26.164984101267674|
|        24.69|   40.31| 25.55826723417512|
+-------------+--------+------------------+



In [28]:
# To end the session, let's be kind on our kernel :)

just_confirming = input("Hold up! Do you really want to end Spark sessio? 'Y'/'N'")
if just_confirming == 'Y': 
    print("Alright, ending Spark Session :'(")
    spark.stop()
elif just_confirming == 'N': print("That was a close call :)")
else: print("You may want to read code before executig ~_~")

Hold up! Do you really want to end Spark sessio? 'Y'/'N' Y


Alright, ending Spark Session :'(
