# PySpark Project

## From Dummy Data to Analysis

### SparkSession Creation

In [1]:
# Configurazione per Sagemaker
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import sys,os
import gc


### Modules Import

In [2]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, date_format, to_date, weekofyear, concat_ws, year, lit, when, row_number, length, substring, concat, expr, lpad,udf, split,concat
from pyspark.sql.types import DateType, TimestampType, StringType, StructType, StructField, FloatType,LongType,TimestampType
from pyspark.sql.window import Window
import random
from datetime import timedelta, datetime

conf=SparkConf()
 
job_name='DummyDataProjectAnalysis'

#Build Spark session
spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .appName(job_name) \
    .enableHiveSupport() \
    .getOrCreate()

In [4]:
output_path = f"{job_name + ".parquet"}"

'DummyDataProjectAnalysis.parquet'

In [None]:
# Define Italian regions with longitude and Latitude
regions_with_coords = [
    ("Abruzzo", 42.3512, 13.3984),
    ("Basilicata", 40.6394, 15.8055),
    ("Calabria", 38.9057, 16.5948),
    ("Campania", 40.8342, 14.2504),
    ("Emilia-Romagna", 44.4949, 11.3426),
    ("Friuli Venezia Giulia", 46.0637, 13.2376),
    ("Lazio", 41.9028, 12.4964),
    ("Liguria", 44.4115, 8.9327),
    ("Lombardia", 45.4668, 9.1905),
    ("Marche", 43.6167, 13.5189),
    ("Molise", 41.5594, 14.6551),
    ("Piemonte", 45.0703, 7.6869),
    ("Puglia", 41.1256, 16.8666),
    ("Sardegna", 40.1209, 9.0129),
    ("Sicilia", 37.6000, 14.0154),
    ("Toscana", 43.7711, 11.2486),
    ("Trentino-Alto Adige", 46.4993, 11.3566),
    ("Umbria", 43.1068, 12.3888),
    ("Valle d'Aosta", 45.7373, 7.3201),
    ("Veneto", 45.4408, 12.3155)
]

df_regions = spark.createDataFrame(regions_with_coords , ["region", "region_latitude","region_longitude"])


In [None]:
# Define the municipalities dataframe

municipalities_with_coords = {
    "Abruzzo": [
        ("L'Aquila", 42.3499, 13.3995),
        ("Pescara", 42.4643, 14.2142),
        ("Chieti", 42.3512, 14.1675),
        ("Teramo", 42.6612, 13.6984),
        ("Sulmona", 42.0484, 13.9273)
    ],
    "Basilicata": [
        ("Potenza", 40.6395, 15.8052),
        ("Matera", 40.6698, 16.6046),
        ("Melfi", 40.9981, 15.6524),
        ("Policoro", 40.2109, 16.6687),
        ("Pisticci", 40.3844, 16.4889)
    ],
    "Calabria": [
        ("Reggio Calabria", 38.1105, 15.6613),
        ("Catanzaro", 38.9106, 16.5877),
        ("Cosenza", 39.3088, 16.2535),
        ("Lamezia Terme", 38.9633, 16.3081),
        ("Vibo Valentia", 38.6764, 16.1002)
    ],
    "Campania": [
        ("Naples", 40.8518, 14.2681),
        ("Salerno", 40.6824, 14.7681),
        ("Caserta", 41.0747, 14.3313),
        ("Benevento", 41.1291, 14.7829),
        ("Avellino", 40.9142, 14.7933)
    ],
    "Emilia-Romagna": [
        ("Bologna", 44.4949, 11.3426),
        ("Modena", 44.6471, 10.9252),
        ("Parma", 44.8015, 10.3279),
        ("Reggio Emilia", 44.6989, 10.6298),
        ("Rimini", 44.0678, 12.5695)
    ],
    "Friuli Venezia Giulia": [
        ("Trieste", 45.6495, 13.7768),
        ("Udine", 46.0656, 13.2354),
        ("Pordenone", 45.9560, 12.6564),
        ("Gorizia", 45.9406, 13.6207),
        ("Monfalcone", 45.8095, 13.5336)
    ],
    "Lazio": [
        ("Rome", 41.9028, 12.4964),
        ("Latina", 41.4676, 12.9037),
        ("Frosinone", 41.6412, 13.3514),
        ("Viterbo", 42.4165, 12.1077),
        ("Rieti", 42.4026, 12.8616)
    ],
    "Liguria": [
        ("Genoa", 44.4115, 8.9327),
        ("La Spezia", 44.1025, 9.8241),
        ("Savona", 44.3091, 8.4771),
        ("Imperia", 43.8897, 8.0395),
        ("Sanremo", 43.8170, 7.7774)
    ],
    "Lombardia": [
        ("Milan", 45.4668, 9.1905),
        ("Bergamo", 45.6983, 9.6773),
        ("Brescia", 45.5416, 10.2118),
        ("Como", 45.8081, 9.0852),
        ("Monza", 45.5845, 9.2744)
    ],
    "Marche": [
        ("Ancona", 43.6167, 13.5189),
        ("Pesaro", 43.9078, 12.9132),
        ("Urbino", 43.7252, 12.6365),
        ("Macerata", 43.3016, 13.4537),
        ("Ascoli Piceno", 42.8619, 13.5769)
    ],
    "Molise": [
        ("Campobasso", 41.5595, 14.6664),
        ("Isernia", 41.5909, 14.2306),
        ("Termoli", 42.0003, 14.9948),
        ("Venafro", 41.4870, 14.0506),
        ("Bojano", 41.4872, 14.4755)
    ],
    "Piemonte": [
        ("Turin", 45.0703, 7.6869),
        ("Alessandria", 44.9126, 8.6194),
        ("Novara", 45.4404, 8.6212),
        ("Asti", 44.8999, 8.2068),
        ("Cuneo", 44.3849, 7.5422)
    ],
    "Puglia": [
        ("Bari", 41.1256, 16.8666),
        ("Taranto", 40.4644, 17.2470),
        ("Lecce", 40.3520, 18.1696),
        ("Foggia", 41.4622, 15.5446),
        ("Brindisi", 40.6328, 17.9402)
    ],
    "Sardegna": [
        ("Cagliari", 39.2238, 9.1217),
        ("Sassari", 40.7259, 8.5550),
        ("Olbia", 40.9230, 9.4867),
        ("Oristano", 39.9018, 8.5916),
        ("Nuoro", 40.3212, 9.3278)
    ],
    "Sicilia": [
        ("Palermo", 38.1157, 13.3615),
        ("Catania", 37.5079, 15.0830),
        ("Messina", 38.1938, 15.5540),
        ("Syracuse", 37.0755, 15.2866),
        ("Agrigento", 37.3094, 13.5857)
    ],
    "Toscana": [
        ("Florence", 43.7696, 11.2558),
        ("Pisa", 43.7228, 10.4017),
        ("Siena", 43.3188, 11.3308),
        ("Livorno", 43.5485, 10.3106),
        ("Lucca", 43.8424, 10.5035)
    ],
    "Trentino-Alto Adige": [
        ("Trento", 46.0667, 11.1215),
        ("Bolzano", 46.4983, 11.3548),
        ("Merano", 46.6707, 11.1597),
        ("Rovereto", 45.8919, 11.0408),
        ("Bressanone", 46.7164, 11.6566)
    ],
    "Umbria": [
        ("Perugia", 43.1107, 12.3908),
        ("Terni", 42.5636, 12.6427),
        ("Foligno", 42.9508, 12.7018),
        ("Spoleto", 42.7397, 12.7340),
        ("Assisi", 43.0707, 12.6196)
    ],
    "Valle d'Aosta": [
        ("Aosta", 45.7373, 7.3201),
        ("Saint-Vincent", 45.7520, 7.6465),
        ("Courmayeur", 45.7919, 6.9653),
        ("Gressoney-Saint-Jean", 45.7769, 7.8275),
        ("Châtillon", 45.7450, 7.6143)
    ],
    "Veneto": [
        ("Venice", 45.4408, 12.3155),
        ("Verona", 45.4384, 10.9916),
        ("Padua", 45.4064, 11.8768),
        ("Vicenza", 45.5455, 11.5409),
        ("Treviso", 45.6669, 12.2425)
    ]
}


# Create Municipalities df 
data = [
    (region, municipality, mun_lat, mun_lon)
    for region, municipalities in municipalities_with_coords.items()
    for municipality, mun_lat, mun_lon in municipalities
]

# Create DataFrame
columns = ["region", "municipality", "municipality_latitude", "municipality_longitude"]
df_municipalities = spark.createDataFrame(data, columns)

# Show the resulting DataFrame
df_municipalities.show(20, truncate=False)



In [None]:
# Join Regions and Municipalities DataFrame

df_geography = df_regions.join(df_municipalities, df_regions.region == df_municipalities.region) \
                .select(df_regions.region, df_regions.region_latitude, df_regions.region_longitude \
                        ,  df_municipalities.municipality, df_municipalities.municipality_latitude \
                        , df_municipalities.municipality_longitude) 
            
     
df_geography.show(20)


### Data Creation

In [None]:
# Generator function to produce consecutive dates.
def generate_dates(start_date, num_days):
    for i in range(num_days):
        yield (start_date + timedelta(days=i)).strftime("%Y-%m-%d"),

# Start date
start_date = datetime.strptime("2018-06-03", "%Y-%m-%d")

# Generate 1 million consecutive dates
dates = generate_dates(start_date, 60)


# Create DataFrame from the generator
df = spark.createDataFrame(dates ,  ["dates"])

In [None]:
# Merge Dates dataframe with Region Dataframe 

df = df.crossJoin(df_geography)

#df.filter(df.Region == "Basilicata").show(20)

In [None]:
# Function to generate random time (in HH:MM:SS format)
def generate_time():
    return f"{random.randint(0, 23):02d}:{random.randint(0, 59):02d}:{random.randint(0, 59):02d}"

# UDF for random time generation
time_udf = udf(generate_time, StringType())

# Function to generate random latency (in milliseconds)
def generate_latency():
    return random.randint(0, 100)  # Latency in milliseconds (ms)

# UDF for random latency generation
latency_udf = udf(generate_latency, IntegerType())

# Function to generate random throughput (in Megabits per second, Mbps)
def generate_throughput():
    return random.randint(10, 1000)  # Throughput in Mbps

# UDF for random throughput generation
throughput_udf = udf(generate_throughput, IntegerType())

# Function to generate random signal strength (in dBm)
def generate_signal_strength():
    return random.randint(-140, -44)  # Signal strength in dBm

signal_strength_udf = udf(generate_signal_strength, IntegerType())

# Function to generate packet loss rate (in percentage)
def generate_packet_loss_rate():
    return round(random.uniform(0.0, 5.0), 2)  # Packet loss rate in percentage (%)

# UDF for random packet loss rate generation
packet_loss_rate_udf = udf(generate_packet_loss_rate, FloatType())

# UDF for generating random jitter (in milliseconds)
def generate_jitter():
    return round(random.uniform(0.0, 50.0), 2)  # Jitter in the range 0 to 50 ms

jitter_udf = udf(generate_jitter, FloatType())

# UDF for generating random network utilization (as a percentage)
def generate_network_utilization():
    return random.randint(0, 100)  # Utilization in percentage (0 to 100%)

network_utilization_udf = udf(generate_network_utilization, IntegerType())

# UDF for generating random data volume (in Megabytes, MB)
def generate_data_volume():
    return random.randint(1, 10000)  # Data volume in Megabytes (MB)

volume_udf = udf(generate_data_volume, LongType())

In [None]:
# Add additional columns to DataFrame

df = df.withColumn("dates", to_date("dates"))
df = df.withColumn("latency", latency_udf())
df = df.withColumn("throughput", throughput_udf())
df = df.withColumn("signal_strengh", signal_strength_udf())
df = df.withColumn("packet_loss", packet_loss_rate_udf())
df = df.withColumn("jitter", jitter_udf())
df = df.withColumn("network_utilization", network_utilization_udf())
df = df.withColumn("volume", volume_udf())
df = df.withColumn("time", time_udf())
df = df.withColumn("hour", split(df.time, ":").getItem(0).cast(IntegerType()))


# Combine dates and time to create a timestamp
df = df.withColumn("day_time", concat(col("dates"), lit(" "), col("time")).cast(TimestampType()))

# Drop the separate time column if not needed
df = df.drop("time")

# only for check purpose
df.show(10)

In [None]:
# Show the schema of the DataFrame
df.printSchema()

In [None]:
# Reorder and Show some example rows
df= df.select( df.dates, df.day_time, df.hour, df.region, df.region_latitude, df.region_longitude \
              ,df.municipality, df.municipality_latitude, df.municipality_longitude \
              ,df.latency, df.throughput, df.signal_strengh, df.packet_loss \
              , df.jitter, df.network_utilization, df.volume
             )
# only for check purpose
df.show(10, truncate =False)

In [None]:
# Schema enforcement

final_schema = schema = StructType([
    StructField("dates", DateType(), True),
    StructField("day_time", TimestampType(), True),
    StructField("hour", IntegerType(), True),
    StructField("region", StringType(), True),
    StructField("region_latitude", FloatType(), True),
    StructField("region_longitude", FloatType(), True),
    StructField("municipality", StringType(), True),
    StructField("municipality_latitude", FloatType(), True),
    StructField("municipality_longitude", FloatType(), True),
    StructField("latency", IntegerType(), True),
    StructField("throughput", IntegerType(), True),
    StructField("signal_strength", IntegerType(), True),
    StructField("packet_loss", FloatType(), True),
    StructField("jitter", FloatType(), True),
    StructField("network_utilization", IntegerType(), True),
    StructField("volume", IntegerType(), True)
])


df = spark.createDataFrame(df.rdd, final_schema)

# only for check purpose
df.filter(df.region == "Basilicata").show(20)

In [None]:
df.write \
    .mode("append") \
    .format("parquet") \
    .partitionBy("dates") \
    .save(output_path)
