In [3]:
#Where we Import 
from pyspark.sql.functions import col, lit, expr, when, to_timestamp
from pyspark.sql.types import *
from datetime import datetime
import time
import pyspark
from pyspark.sql import SparkSession
import random
import string
import pandas as pd
from pyspark.sql.functions import avg
import plotly.express as px


In [4]:
##+===========================================================SET DATASETLENGTH HERE!!!
DataLength=1000000

In [3]:
#ORIGINAL DATASETS
#German Cities Data
german_cities_data = {
    "CityID": [1, 2, 3, 4, 5],
    "CityName": ["Berlin", "Hamburg", "Munich", "Cologne", "Frankfurt"],
    "Population": [3520000, 1790000, 1450000, 1060000, 733000],
    "Area": [891.3, 755.2, 310.7, 405.15, 500],
    "CulturalSites": [7, 6, 3, 5, 3]  
}

# French Cities Data
french_cities_data = {
    "CityID": [1, 2, 3, 4, 5],
    "CityName": ["Paris", "Marseille", "Lyon", "Toulouse", "Nice"],
    "Population": [2250000, 851000, 491000, 447000, 344000],
    "Area": [250, 240.6, 47.87, 118.3, 71.9],      
    "CulturalSites": [7, 7, 1, 4, 3]
}

In [4]:

# Function to generate random city name
def generate_city_name(length):
    letters = string.ascii_letters
    return ''.join(random.choice(letters) for i in range(length))

# Function to generate random data for cities
def generate_city_data(num_cities):
    cities_data = []
    for i in range(num_cities):
        city_name = generate_city_name(random.randint(4, 6))
        population = random.randint(350, 5000000)
        area = round(random.uniform(170, 5000), 2)
        tourist_visits = random.randint(3000000, 6000000)
        cultural_sites = random.randint(2, 7)
        cities_data.append({
            "CityID": i + 1,
            "CityName": city_name,
            "Population": population,
            "Area": area,
            "TouristVisits": tourist_visits,
            "CulturalSites": cultural_sites
        })
    return cities_data



In [5]:
# # Augment premade German cities data
#This all just adds the random data to the premade dataframes
# german_cities_augmented = german_cities_data.copy()
# german_cities_augmented["CityID"].extend(range(6, DataLength+6))
# german_cities_augmented["CityName"].extend([city_data["CityName"] for city_data in generate_city_data(DataLength)])
# german_cities_augmented["Population"].extend([city_data["Population"] for city_data in generate_city_data(DataLength)])
# german_cities_augmented["Area"].extend([city_data["Area"] for city_data in generate_city_data(DataLength)])
# german_cities_augmented["CulturalSites"].extend([city_data["CulturalSites"] for city_data in generate_city_data(DataLength)])

# # Augment premade French cities data
# french_cities_augmented = french_cities_data.copy()
# french_cities_augmented["CityID"].extend(range(6, DataLength+6))
# french_cities_augmented["CityName"].extend([city_data["CityName"] for city_data in generate_city_data(DataLength)])
# french_cities_augmented["Population"].extend([city_data["Population"] for city_data in generate_city_data(DataLength)])
# french_cities_augmented["Area"].extend([city_data["Area"] for city_data in generate_city_data(DataLength)])
# french_cities_augmented["CulturalSites"].extend([city_data["CulturalSites"] for city_data in generate_city_data(DataLength)])


In [6]:
# #MAKE DATAFRAMES using pandas 
# df_german_cities = pd.DataFrame(german_cities_data)
# df_french_cities = pd.DataFrame(french_cities_data)

In [7]:
# # Save to CSV files: ===================== !GRADER YOU WILL NEED TO UNCOMMENT THIS TO RUN CODE ON YOUR MACHINE!

# df_german_cities.to_csv('german_cities.csv', index=False)
# df_french_cities.to_csv('french_cities.csv', index=False) 

In [5]:
#Calling the paths to the saved Dataframes
frenchpath=r"french_cities.csv"
Germpath=r"german_cities.csv"

In [6]:
#Establishing A spark Session
spark = SparkSession.builder \
    .appName("Assignment 1") \
    .getOrCreate()

In [7]:
#This will set and apply a schema for the French Cities dataset
F_schema = StructType([
    StructField("CityID", IntegerType(), nullable=False),
    StructField("CityName", StringType(), nullable=False),
    StructField("Population", IntegerType(), nullable=False),
    StructField("Area", DoubleType(), nullable=False),
    StructField("CulturalSites", IntegerType(), nullable=False)
])

french_cities = spark.read.format("csv").load(frenchpath, schema=F_schema)

In [8]:
#This will set a schema for the German Cities Dataset
G_schema = StructType([
    StructField("CityID", IntegerType(), nullable=False),
    StructField("CityName", StringType(), nullable=False),
    StructField("Population", IntegerType(), nullable=False),
    StructField("Area", DoubleType(), nullable=False),
    StructField("CulturalSites", IntegerType(), nullable=False)
])

german_cities = spark.read.format("csv").load(Germpath, schema=G_schema)

In [9]:
#This is just to drop the first row of each dataframe which for some reason is just the column names and null values
german_cities=german_cities.dropna()
french_cities=french_cities.dropna()

In [10]:
#ALEX Q2
#Pyspark Dataframe Operation
#Here we are selecting both data frames and ordering them by area. 
#We can see French cities are a bit larger by area
germ_city_sizes = german_cities.select("CityName", "Area", "Population").orderBy(col("Area").desc()).show()
french_city_sizes = french_cities.select("CityName", "Area", "Population").orderBy(col("Area").desc()).show()
#============================================

+--------+-------+----------+
|CityName|   Area|Population|
+--------+-------+----------+
|    Brpk| 5000.0|    191432|
|  cBgzzc|4999.98|   1289866|
|  AteeRA|4999.98|   1671934|
|    MAqq|4999.98|   3186545|
|    ZimV|4999.98|   3390964|
|  fKfxif|4999.96|   1210617|
|    Iuww|4999.95|   2500533|
|   HpzNW|4999.95|   3627489|
|  HYUuaO|4999.94|   2027121|
|   foZww|4999.94|   4023043|
|   BCEeu|4999.94|    939649|
|    drFn|4999.94|   4974197|
|    Mycw|4999.93|   3362677|
|   tnOVK|4999.92|   1104528|
|    ENaA|4999.92|   2308746|
|   CJowj|4999.91|   3651452|
|  sVBbsm|4999.91|   4239354|
|  qIObjO| 4999.9|   2416419|
|   cwoBa|4999.89|   3540298|
|   eUZMg|4999.89|   2817431|
+--------+-------+----------+
only showing top 20 rows

+--------+-------+----------+
|CityName|   Area|Population|
+--------+-------+----------+
|    NNzT| 5000.0|   3565660|
|   KThrN| 5000.0|   4646191|
|  GbmkVy| 5000.0|    185132|
|    DkIJ|4999.98|   4201978|
|  awYDYU|4999.98|    580237|
|  duXZXo|4999

In [11]:
#Alex Q2
#SQL Version
#Making temp views so I can use SQL
german_cities.createOrReplaceTempView("german_cities")
french_cities.createOrReplaceTempView("french_cities")

# German City Call
germ_city_sizes = spark.sql("""
    
    SELECT CityName, Area, Population 
    FROM german_cities 
    ORDER BY Area DESC

""")
germ_city_sizes.show()

#French City Call
french_city_sizes = spark.sql("""
    
    SELECT CityName, Area, Population 
    FROM french_cities 
    ORDER BY Area DESC

""")

# Show the result for French cities
french_city_sizes.show()

Py4JError: An error occurred while calling o24.sql. Trace:
py4j.Py4JException: Method sql([class java.lang.String, class [Ljava.lang.Object;]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)



In [None]:
#q3 paul

from pyspark.sql.functions import sum, avg, min, max, count

#pyspark solutions
print("PySpark Solutions:")

#aggregate for german cities
germany_agg = german_cities.agg(sum("Population").alias("TotalPopulation"),
                                 avg("Area").alias("AverageArea"),
                                 min("Population").alias("MinPopulation"),
                                 max("Population").alias("MaxPopulation"),
                                 count("CityID").alias("CityCount"))
germany_agg.show()

#aggregate for french cities
france_agg = french_cities.agg(sum("Population").alias("TotalPopulation"),
                               avg("Area").alias("AverageArea"),
                               min("Population").alias("MinPopulation"),
                               max("Population").alias("MaxPopulation"),
                               count("CityID").alias("CityCount"))
france_agg.show()

#sql solutions
print("\nSQL Solutions:")

#aggregate for german cities
germany_agg_sql = spark.sql("""
    SELECT SUM(Population) AS TotalPopulation,
           AVG(Area) AS AverageArea,
           MIN(Population) AS MinPopulation,
           MAX(Population) AS MaxPopulation,
           COUNT(CityID) AS CityCount
    FROM german_cities
""")
germany_agg_sql.show()

#aggregate for french cities
france_agg_sql = spark.sql("""
    SELECT SUM(Population) AS TotalPopulation,
           AVG(Area) AS AverageArea,
           MIN(Population) AS MinPopulation,
           MAX(Population) AS MaxPopulation,
           COUNT(CityID) AS CityCount
    FROM french_cities
""")
france_agg_sql.show()


In [15]:
#Q5 Charan
full_outer_join_df = german_cities.join(french_cities, "CityID", "full_outer")\
                                   .select(german_cities["CityName"].alias("GermanCityName"),
                                           french_cities["CityName"].alias("FrenchCityName"),
                                           german_cities["Population"].alias("PopulationInGermany"),
                                           french_cities["Population"].alias("PopulationInFrance"))

# Show the combined DataFrame
full_outer_join_df.show()


+--------------+--------------+-------------------+------------------+
|GermanCityName|FrenchCityName|PopulationInGermany|PopulationInFrance|
+--------------+--------------+-------------------+------------------+
|         yPGeN|          Arrm|            3898854|           1533884|
|         BqIqF|        YpXVTA|            2626349|           1677669|
|        oPMhti|         iCIMy|            2258897|           1116640|
|         Gqdcx|         Ppsnh|            4242141|            634336|
|         tIutr|          DEsQ|             446469|           2698268|
|         kYnhs|        bbpzqc|            4848747|           1554811|
|        cVYwaO|         QxElg|            4181455|           1303426|
|          CjDj|          Xqjr|            4353703|           4622090|
|         ogzIT|        wHzKXB|             216939|            671899|
|         HFOnE|          VneL|            1345457|           4164036|
|          Lszg|          qvmt|            2713054|           1789120|
|     

In [16]:
#Q5 SQL
# SQL Query to perform full outer join 
query = """
SELECT 
    g.CityName AS GermanCityName,
    f.CityName AS FrenchCityName,
    g.Population AS PopulationInGermany,
    f.Population AS PopulationInFrance
FROM german_cities g
FULL OUTER JOIN french_cities f ON g.CityID = f.CityID
"""

# Execute the SQL query using Spark SQL
full_outer_join_df = spark.sql(query)

# Show the results of the SQL query
full_outer_join_df.show()


+--------------+--------------+-------------------+------------------+
|GermanCityName|FrenchCityName|PopulationInGermany|PopulationInFrance|
+--------------+--------------+-------------------+------------------+
|         yPGeN|          Arrm|            3898854|           1533884|
|         BqIqF|        YpXVTA|            2626349|           1677669|
|        oPMhti|         iCIMy|            2258897|           1116640|
|         Gqdcx|         Ppsnh|            4242141|            634336|
|         tIutr|          DEsQ|             446469|           2698268|
|         kYnhs|        bbpzqc|            4848747|           1554811|
|        cVYwaO|         QxElg|            4181455|           1303426|
|          CjDj|          Xqjr|            4353703|           4622090|
|         ogzIT|        wHzKXB|             216939|            671899|
|         HFOnE|          VneL|            1345457|           4164036|
|          Lszg|          qvmt|            2713054|           1789120|
|     

In [17]:
#ALEX Q6
#Pyspark Dataframe operation
#For this question I am using
print("Germany")
german_cities.select(
    avg("CityID").alias("Avg_CityID"),
    avg("Population").alias("Avg_Population"),
    avg("Area").alias("Avg_Area"),
    avg("CulturalSites").alias("Avg_CulturalSites")
).show()

print("France")
french_cities.select(
    avg("CityID").alias("Avg_CityID"),
    avg("Population").alias("Avg_Population"),
    avg("Area").alias("Avg_Area"),
    avg("CulturalSites").alias("Avg_CulturalSites")
).show()


#As we can see they are similar sizes, with Germany having slightly more cultural sites and population
#This makes sense that both would be similar as they are randomly generated via the same parameters


Germany
+----------+------------------+------------------+-----------------+
|Avg_CityID|    Avg_Population|          Avg_Area|Avg_CulturalSites|
+----------+------------------+------------------+-----------------+
|  500003.0|2499948.9589232053|2585.7041581891945|4.499701501492493|
+----------+------------------+------------------+-----------------+

France
+----------+------------------+------------------+-----------------+
|Avg_CityID|    Avg_Population|          Avg_Area|Avg_CulturalSites|
+----------+------------------+------------------+-----------------+
|  500003.0|2499667.3588442057|2584.0123058284817| 4.50078249608752|
+----------+------------------+------------------+-----------------+



In [18]:
#Alex Q6
#SQL Version

#Using the previously created views in Q2
#German query
avg_german_stats = spark.sql("""
    
    SELECT AVG(CityID) AS Avg_CityID,
           AVG(Population) AS Avg_Population,
           AVG(Area) AS Avg_Area,
           AVG(CulturalSites) AS Avg_CulturalSites
    FROM german_cities
    
""")

avg_german_stats.show()

# French Query
avg_french_stats = spark.sql("""

    SELECT AVG(CityID) AS Avg_CityID,
           AVG(Population) AS Avg_Population,
           AVG(Area) AS Avg_Area,
           AVG(CulturalSites) AS Avg_CulturalSites
    FROM french_cities
    
""")
avg_french_stats.show()


+----------+------------------+------------------+-----------------+
|Avg_CityID|    Avg_Population|          Avg_Area|Avg_CulturalSites|
+----------+------------------+------------------+-----------------+
|  500003.0|2499948.9589232053|2585.7041581891945|4.499701501492493|
+----------+------------------+------------------+-----------------+

+----------+------------------+------------------+-----------------+
|Avg_CityID|    Avg_Population|          Avg_Area|Avg_CulturalSites|
+----------+------------------+------------------+-----------------+
|  500003.0|2499667.3588442057|2584.0123058284817| 4.50078249608752|
+----------+------------------+------------------+-----------------+



In [19]:
#Q4 Srinivas
germany_density = german_cities.withColumn("Density", col("Population") / col("Area"))
france_density = french_cities.withColumn("Density", col("Population") / col("Area"))
germany_density.select("CityName", "Density").show()
france_density.select("CityName", "Density").show()


+---------+------------------+
| CityName|           Density|
+---------+------------------+
|   Berlin| 3949.287557500281|
|  Hamburg|2370.2330508474574|
|   Munich| 4666.881235918893|
|  Cologne|2616.3149450820683|
|Frankfurt|            1466.0|
|     tfOB| 498.2321165493534|
|     uBMt|3756.3574614760746|
|     fpbw|104.04242996776689|
|     rOXV| 547.7717676827241|
|    dApbQ| 562.0234644967117|
|   zIXZbo|1049.9719393890805|
|   WoiTtZ|1091.6647137835444|
|     AtMM| 787.1854868654482|
|    EfLQd|16940.895207419337|
|   iKbycv| 499.0313445086107|
|     bQQH|  2379.42325951114|
|     DYiw|1077.3815788611744|
|    qzQcV| 4364.230797090202|
|    XozDq| 866.3320888244964|
|   KvnMpM|24.474152204574725|
+---------+------------------+
only showing top 20 rows

+---------+------------------+
| CityName|           Density|
+---------+------------------+
|    Paris|            9000.0|
|Marseille|3536.9908561928514|
|     Lyon|10256.945895132652|
| Toulouse| 3778.529163144548|
|     Nice| 4

In [20]:
# Germany Density Calculation
print("Germany")
density_german_stats = spark.sql("""
SELECT CityName, Population / Area AS Density
FROM german_cities

""")

density_german_stats.show()

# French Density Calculation
print("France")

density_french_stats = spark.sql("""
SELECT CityName, Population / Area AS Density
FROM french_cities

""")

density_french_stats.show()



Germany
+---------+------------------+
| CityName|           Density|
+---------+------------------+
|   Berlin| 3949.287557500281|
|  Hamburg|2370.2330508474574|
|   Munich| 4666.881235918893|
|  Cologne|2616.3149450820683|
|Frankfurt|            1466.0|
|     tfOB| 498.2321165493534|
|     uBMt|3756.3574614760746|
|     fpbw|104.04242996776689|
|     rOXV| 547.7717676827241|
|    dApbQ| 562.0234644967117|
|   zIXZbo|1049.9719393890805|
|   WoiTtZ|1091.6647137835444|
|     AtMM| 787.1854868654482|
|    EfLQd|16940.895207419337|
|   iKbycv| 499.0313445086107|
|     bQQH|  2379.42325951114|
|     DYiw|1077.3815788611744|
|    qzQcV| 4364.230797090202|
|    XozDq| 866.3320888244964|
|   KvnMpM|24.474152204574725|
+---------+------------------+
only showing top 20 rows

France
+---------+------------------+
| CityName|           Density|
+---------+------------------+
|    Paris|            9000.0|
|Marseille|3536.9908561928514|
|     Lyon|10256.945895132652|
| Toulouse| 3778.529163144548

In [21]:

top_20_cities = french_cities.orderBy("Population", ascending=False).limit(20)
city_names = top_20_cities.select("CityName").collect()
cultural_sites = top_20_cities.select("CulturalSites").collect()
city_names = [row.CityName for row in city_names]
cultural_sites = [row.CulturalSites for row in cultural_sites]


data_dict = {'CityName': city_names, 'CulturalSites': cultural_sites}
df = pd.DataFrame(data_dict)
fig = px.bar(df, x="CityName", y="CulturalSites",
             title="Cultural Sites in Top 20 Populous French Cities")
fig.update_layout(xaxis_title="City", yaxis_title="Number of Cultural Sites")
fig.show()



In [22]:
top_20_cities_german = german_cities.orderBy("Population", ascending=False).limit(20)
city_names = top_20_cities_german.select("CityName").collect()
cultural_sites = top_20_cities_german.select("CulturalSites").collect()
city_names = [row.CityName for row in city_names]
cultural_sites = [row.CulturalSites for row in cultural_sites]
data_dict = {'CityName': city_names, 'CulturalSites': cultural_sites}
df = pd.DataFrame(data_dict)
fig = px.bar(df, x="CityName", y="CulturalSites",
             title="Cultural Sites in Top 20 Populous German Cities")
fig.update_layout(xaxis_title="City", yaxis_title="Number of Cultural Sites")
fig.show()