### MONGODB

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, sum

# Create a Spark session
spark1 = SparkSession.builder \
    .appName("MongoDBMflixAnalysis") \
    .config("spark.mongodb.input.uri", f"mongodb+srv://trunghaunguyen:trunghaunguyen@cluster0.xduzm1u.mongodb.net") \
    .config("spark.mongodb.output.uri", f"mongodb+srv://trunghaunguyen:trunghaunguyen@cluster0.xduzm1u.mongodb.net") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

# Read Political dataset on MongoDB
politics_df = spark1.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb+srv://trunghaunguyen:trunghaunguyen@cluster0.xduzm1u.mongodb.net/UFO.Politics").load()

# Select focused info
year_state = politics_df.select(politics_df["VCF0004"].alias("Year"), 
                                politics_df["VCF0901b"].alias("State"), 
                                politics_df["VCF0705"].alias("Presidential_Vote")
                               )

year_state = year_state.filter(col('State') != '99') # 99 means no information, drop those records

year_state = year_state.orderBy('year', 'state', ascending=True)

# Give the alias for 4 types of Presidential_Vote and select year state and 4 types of Presidential votes for analyze
political_df = year_state.groupBy('year', 'state') \
            .agg(count(when(col("Presidential_Vote") == "0", True)).alias("not voted"), \
                 count(when(col("Presidential_Vote") == "1", True)).alias("Democrat"), \
                 count(when(col("Presidential_Vote") == "2", True)).alias("Republican"), \
                 count(when(col("Presidential_Vote") == "3", True)).alias("Other Party"), \
                 sum(when(col("Presidential_Vote").isin(["0", "1", "2", "3"]), 1).otherwise(0)).alias("total_votes")) \
            .select('year', 'state', col('not voted'), col('Democrat'), col('Republican'), col('Other Party'), col('total_votes'))
political_df = political_df.orderBy('year', 'state', ascending=True)

print("Yearly Presidential Votes by State:")
political_df.show()

:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/azureuser/.ivy2/cache
The jars for the packages stored in: /home/azureuser/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-647f1e7b-120d-45ee-af93-ebd104d124be;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 257ms :: artifacts dl 10ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   art

Yearly Presidential Votes by State:


[Stage 1:>                                                          (0 + 1) / 1]

+----+-----+---------+--------+----------+-----------+-----------+
|year|state|not voted|Democrat|Republican|Other Party|total_votes|
+----+-----+---------+--------+----------+-----------+-----------+
|1952|   AL|       39|      10|         9|          0|         58|
|1952|   AR|       24|      14|        14|          0|         52|
|1952|   CA|       52|      29|        62|          0|        143|
|1952|   CO|        7|      12|        12|          0|         31|
|1952|   CT|       11|      21|        24|          0|         56|
|1952|   DC|       14|       2|         0|          0|         16|
|1952|   FL|       26|       9|         5|          0|         40|
|1952|   GA|       32|      11|         5|          0|         48|
|1952|   IA|       21|      25|        48|          0|         94|
|1952|   ID|        6|       4|         6|          0|         16|
|1952|   IL|       12|      30|        35|          0|         77|
|1952|   IN|       12|      14|        15|          0|        

                                                                                

### Hadoop

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import sum as spark_sum, col, year, count as spark_count

# Create a Spark session
spark2 = SparkSession.builder.appName("PollutionAnalysis").getOrCreate()

# Set the Hadoop file path
hadoop_path = "hdfs://localhost:9000/user/input/pollution/"

# Read CSV files from Hadoop
df_air_pollution = spark2.read.csv(hadoop_path + "pollution_us_2000_2016.csv", header=True, inferSchema=True)

# Select focused info
df_air_pollution = df_air_pollution.select('State',
                               year(df_air_pollution['Date Local']).alias('Year'),
                               'NO2 AQI',
                               'O3 AQI',
                               'SO2 AQI',
                               'CO AQI'
                               )\
                        .groupBy('State', 'Year')\
                        .agg(avg('NO2 AQI').alias('avg_no2_aqi'),
                             avg('O3 AQI').alias('avg_o3_aqi'),
                             avg('SO2 AQI').alias('avg_so2_aqi'),
                             avg('CO AQI').alias('avg_co_aqi')
                            )\
                        .orderBy('State')

# Dict of state and its abbreviation
state_abbreviations = {
    'TX': 'Texas', 'HI': 'Hawaii', 'TN': 'Tennessee', 'CT': 'Connecticut', 'AL': 'Alabama', 'FL': 'Florida', 'CA': 'California',
    'NC': 'North Carolina', 'NY': 'New York', 'KY': 'Kentucky', 'MI': 'Michigan', 'MA': 'Massachusetts', 'KS': 'Kansas',
    'SC': 'South Carolina', 'WA': 'Washington', 'CO': 'Colorado', 'NH': 'New Hampshire', 'WI': 'Wisconsin', 'ME': 'Maine',
    'GA': 'Georgia', 'PA': 'Pennsylvania', 'IL': 'Illinois', 'AR': 'Arkansas', 'MO': 'Missouri', 'OH': 'Ohio', 'IN': 'Indiana',
    'AZ': 'Arizona', 'MN': 'Minnesota', 'NV': 'Nevada', 'NE': 'Nebraska', 'OR': 'Oregon', 'IA': 'Iowa', 'VA': 'Virginia',
    'ID': 'Idaho', 'NM': 'New Mexico', 'NJ': 'New Jersey', 'WV': 'West Virginia', 'OK': 'Oklahoma', 'AK': 'Alaska', 'RI': 'Rhode Island',
    'VT': 'Vermont', 'LA': 'Louisiana', 'ND': 'North Dakota', 'PR': 'Puerto Rico', 'MS': 'Mississippi', 'UT': 'Utah', 'MD': 'Maryland',
    'MT': 'Montana', 'WY': 'Wyoming', 'SD': 'South Dakota', 'DE': 'Delaware', 'DC': 'District of Columbia'
}
state_map = {full_name.title(): abbrev for abbrev, full_name in state_abbreviations.items()}


df_air_pollution = df_air_pollution.withColumn("State Abbreviation", col("State")).replace(state_map, subset=["State Abbreviation"])
df_air_pollution = df_air_pollution.withColumnRenamed("Year", "year")
df_air_pollution = df_air_pollution.withColumnRenamed("State Abbreviation", "stateabbr")
df_air_pollution = df_air_pollution.withColumnRenamed("State", "state_1")

print("Average Air Quality Index by State for all years, ordered by Year and State:")
df_air_pollution.show()

24/03/06 21:00:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Average Air Quality Index by State for all years, ordered by Year and State:


                                                                                

+----+--------------------+---------------+--------------+---------------+--------------+---------+
|year|             state_1|Average_NO2_AQI|Average_O3_AQI|Average_SO2_AQI|Average_CO_AQI|stateabbr|
+----+--------------------+---------------+--------------+---------------+--------------+---------+
|2000|             Arizona|             47|            39|              7|            15|       AZ|
|2000|          California|             29|            32|              6|            11|       CA|
|2000|            Colorado|             32|            27|             21|            11|       CO|
|2000|District Of Columbia|             37|            27|             24|            18|       DC|
|2000|             Florida|             25|            39|              7|            10|       FL|
|2000|            Illinois|             42|            24|             24|            11|       IL|
|2000|             Indiana|             31|            43|             24|            12|       IN|


### MARIADB

In [3]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, sum, count
from pyspark.sql.types import IntegerType

# Create Spark session
spark3 = SparkSession.builder \
    .appName("UFO Sightings Analysis") \
    .master("local") \
    .getOrCreate()

# Define connection
server = "localhost"
port = 3306
database = "ufodataset"
jdbc_url = f"jdbc:mysql://{server}:{port}/{database}?permitMysqlScheme"

user = "haunguyen"
password = "haunguyenpwd"
jdbc_driver = "org.mariadb.jdbc.Driver"

properties = {
    "user": user,
    "password": password,
    "driver": jdbc_driver
}

# Read the database from MariaDB
ufo_df = spark3.read.jdbc(jdbc_url, "(select * from mytable) tab", properties=properties)
spark3.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Extract the year from the 'datetime' column
ufo_df = ufo_df.withColumn('year', udf(lambda datetime_str: int(datetime_str.split()[0].split('/')[-1]), IntegerType())('datetime'))

# Filter to get only rows where the country is 'us'
ufo_df = ufo_df.filter(col('country') == 'us')

# calculate the sum of 'durationsec' and count cases by year and state
ufo_df = ufo_df.groupBy('year', upper('state').alias('state')) \
            .agg(sum('durationseconds').alias('duration_in_sec_UFO_cases'), \
                 count('*').alias('total_UFO_cases_by_state')) \
            .select('year', 'state', col('duration_in_sec_UFO_cases').cast(IntegerType()), 'total_UFO_cases_by_state')
ufo_df = ufo_df.orderBy('year', 'state', ascending=True)

print("Yearly UFO sightings by State:")
ufo_df.show()

24/03/06 21:00:36 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Yearly UFO sightings by State:


[Stage 27:>                                                         (0 + 1) / 1]

+----+-----+-------------------------+------------------------+
|year|state|duration_in_sec_UFO_cases|total_UFO_cases_by_state|
+----+-----+-------------------------+------------------------+
|1910|   MO|                      480|                       4|
|1910|   TX|                      480|                       4|
|1920|   IN|                      240|                       4|
|1925|   IL|                      240|                       4|
|1929|   OR|                      240|                       4|
|1931|   CO|                      240|                       4|
|1931|   KS|                     7200|                       4|
|1934|   ND|                       20|                       4|
|1936|   AK|                      720|                       4|
|1937|   CA|                     2400|                       4|
|1937|   OR|                       40|                       4|
|1939|   AL|                     1200|                       4|
|1939|   IA|                     2400|  

                                                                                

In [4]:
# Group by 'year' and calculate the sum of 'duration_in_sec' for each year
yearly_ufo_duration = ufo_df.groupBy('year') \
                            .agg(sum('duration_in_sec_UFO_cases').alias('total_duration_in_sec_UFO_cases'))

# Group by 'year' and calculate the sum of 'total_UFO_cases_by_state' for each year
yearly_ufo_cases = ufo_df.groupBy('year') \
                        .agg(sum('total_UFO_cases_by_state').alias('total_UFO_cases_by_year'))

# Join the two DataFrames on 'year'
yearly_ufo_duration = yearly_ufo_duration.join(yearly_ufo_cases, 'year', 'inner')
yearly_ufo_duration = yearly_ufo_duration.orderBy('year')
yearly_ufo_duration = yearly_ufo_duration.withColumnRenamed('year', 'year_draft')

yearly_ufo_duration.show()


[Stage 31:>                                                         (0 + 1) / 1]

+----------+-------------------------------+-----------------------+
|year_draft|total_duration_in_sec_UFO_cases|total_UFO_cases_by_year|
+----------+-------------------------------+-----------------------+
|      1910|                            960|                      8|
|      1920|                            240|                      4|
|      1925|                            240|                      4|
|      1929|                            240|                      4|
|      1931|                           7440|                      8|
|      1934|                             20|                      4|
|      1936|                            720|                      4|
|      1937|                           2440|                      8|
|      1939|                           3640|                     12|
|      1941|                            120|                      4|
|      1942|                           1200|                      8|
|      1943|                      

                                                                                

In [5]:
# Join the first ufo dataset with total ufo dataset
ufo_df = ufo_df.alias("ufo").join(yearly_ufo_duration.alias("yearly_duration"), 
                        col("ufo.year") == col("yearly_duration.year_draft"), 
                        how="inner")
ufo_df = ufo_df.drop("year_draft")
ufo_df = ufo_df.orderBy('year', 'state', ascending=True)


ufo_df.show()

[Stage 42:>   (0 + 1) / 1][Stage 43:>   (0 + 1) / 1][Stage 44:>   (0 + 1) / 1]1]

+----+-----+-------------------------+------------------------+-------------------------------+-----------------------+
|year|state|duration_in_sec_UFO_cases|total_UFO_cases_by_state|total_duration_in_sec_UFO_cases|total_UFO_cases_by_year|
+----+-----+-------------------------+------------------------+-------------------------------+-----------------------+
|1910|   MO|                      480|                       4|                            960|                      8|
|1910|   TX|                      480|                       4|                            960|                      8|
|1920|   IN|                      240|                       4|                            240|                      4|
|1925|   IL|                      240|                       4|                            240|                      4|
|1929|   OR|                      240|                       4|                            240|                      4|
|1931|   CO|                      240|  

                                                                                

### SNOWFLAKE

In [6]:
from pyspark.sql import SparkSession

# Create Spark session with Spark & SnowFlake Connector
spark4 = SparkSession.builder \
    .appName("Snowflake Query") \
    .config("spark.jars", "/usr/local/spark/jars/snowflake-jdbc-3.14.5.jar,/usr/local/spark/jars/spark-snowflake_2.13-2.14.0-spark_3.4.jar") \
    .getOrCreate()

# Set Snowflake connection
sfOptions = {
  "sfURL" : "https://tltmqjz-yk82271.snowflakecomputing.com",
  "sfAccount" : "YK82271",
  "sfUser" : "HOWARDNGUYEN29",
  "sfPassword" : "HowardNguyen29",
  "sfDatabase" : "GBCTRAINING3",
  "sfSchema" : "PUBLIC",
  "sfWarehouse" : "COMPUTE_WH",
}

# Query 2 databases, one is .json and one is .csv, then union those
query = """
    SELECT COALESCE(CRIME_JOIN.REPORT_YEAR, MENTAL_HEALTH.Year) AS Year,
       COALESCE(CRIME_JOIN.STATEABBR, MENTAL_HEALTH.LocationAbbr) AS STATEABBR,
       COALESCE(CRIME_JOIN.STATE, MENTAL_HEALTH.STATE) AS STATE,
       COALESCE(CRIME_JOIN.TOTAL_POPULATION, NULL) AS TOTAL_POPULATION,
       -- COALESCE(CRIME_JOIN.TOTAL_homicides, NULL) AS TOTAL_homicides,
       -- COALESCE(CRIME_JOIN.TOTAL_rapes, NULL) AS TOTAL_rapes,
       -- COALESCE(CRIME_JOIN.TOTAL_assaults, NULL) AS TOTAL_assaults,
       -- COALESCE(CRIME_JOIN.TOTAL_robberies, NULL) AS TOTAL_robberies,
       COALESCE(CRIME_JOIN.TOTAL_CRIMES, NULL) AS TOTAL_CRIMES,
       MENTAL_HEALTH.DATAVALUE AS Crude_Prevalence_of_Mental_health_Cases,
       ROUND(((MENTAL_HEALTH.DATAVALUE / 1000) * TOTAL_POPULATION) / 100, 0) AS Total_Mental_health_Cases_reported_by_BRFSS,        
       -- MENTAL_HEALTH.DataValueType

FROM (
    SELECT CRIME_STAT.REPORT_YEAR,
           CRIME_STAT.STATEABBR,
           CRIME_STAT.STATE,
           SUM(CRIME_STAT.POPULATION) AS TOTAL_POPULATION,
           SUM(CRIME_STAT.homicides) AS TOTAL_homicides,
           SUM(CRIME_STAT.rapes) AS TOTAL_rapes,
           SUM(CRIME_STAT.assaults) AS TOTAL_assaults,
           SUM(CRIME_STAT.robberies) AS TOTAL_robberies,
           SUM(CRIME_STAT.TOTAL_CRIMES) AS TOTAL_CRIMES
    FROM (
        SELECT CRIME.REPORT_YEAR,
               CRIME.StateAbbr,
               COALESCE(MENTAL_HEALTH.LocationDesc, CRIME.StateAbbr) AS STATE,
               CRIME.POPULATION, 
               CRIME.homicides AS homicides,
               CRIME.rapes AS rapes,
               CRIME.assaults AS assaults,
               CRIME.robberies AS robberies,
               CRIME.VIOLENT_CRIMES AS total_crimes,
               CRIME.homicides_percapita as homicides_percapita,
               CRIME.rapes_percapita as rapes_percapita,
               CRIME.assaults_percapita as assaults_percapita,
               CRIME.robberies_percapita as robberies_percapita,
               CRIME.CRIMES_PERCAPITA AS Crime_per_capita
        FROM (
            SELECT *,
                   SPLIT_PART(agency_jurisdiction, ', ', -1) AS StateAbbr
            FROM CRIME
        ) CRIME
        LEFT JOIN (
            SELECT DISTINCT
                   LocationAbbr,
                   LocationDesc
            FROM MENTAL_HEALTH
        ) MENTAL_HEALTH
        ON CRIME.StateAbbr = MENTAL_HEALTH.LocationAbbr
        WHERE State != 'United States'
    ) CRIME_STAT
    GROUP BY CRIME_STAT.REPORT_YEAR, CRIME_STAT.STATE, CRIME_STAT.STATEABBR
    ORDER BY CRIME_STAT.REPORT_YEAR, CRIME_STAT.STATEABBR
) CRIME_JOIN
LEFT JOIN (
    SELECT YearStart as Year,
           LocationAbbr,
           LocationDesc AS STATE,
           SUM(TRY_CAST(DataValue AS FLOAT)) AS DATAVALUE,
           DataValueType
    FROM MENTAL_HEALTH
    WHERE DataSource = 'BRFSS'
          AND Stratification1 = 'Overall'
          AND DataValueType =  'Crude Prevalence'
          AND LocationDesc != 'United States'
          AND TRY_CAST(DataValue AS FLOAT) IS NOT NULL
    GROUP BY Year, LocationAbbr, LocationDesc, DataValueType
    ORDER BY Year, LocationAbbr, DataValueType
) MENTAL_HEALTH
ON CRIME_JOIN.REPORT_YEAR = MENTAL_HEALTH.Year
   AND CRIME_JOIN.STATEABBR = MENTAL_HEALTH.LocationAbbr
ORDER BY Year, STATEABBR;


"""

# Load data from Snowflake using the defined query
df_snowflake = spark4.read.format("snowflake") \
    .options(**sfOptions) \
    .option("query", query) \
    .load()

df_snowflake.show()



24/03/06 21:00:45 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----+---------+--------------------+----------------+------------+---------------------------------------+-------------------------------------------+
|YEAR|STATEABBR|               STATE|TOTAL_POPULATION|TOTAL_CRIMES|CRUDE_PREVALENCE_OF_MENTAL_HEALTH_CASES|TOTAL_MENTAL_HEALTH_CASES_REPORTED_BY_BRFSS|
+----+---------+--------------------+----------------+------------+---------------------------------------+-------------------------------------------+
|1975|       AZ|             Arizona|         1121722|        6726|                                   null|                                       null|
|1975|       CA|          California|         6858823|       67951|                                   null|                                       null|
|1975|       CO|            Colorado|          624796|        5571|                                   null|                                       null|
|1975|       DC|District of Columbia|          716000|       12704|                     

In [7]:
df_snowflake.printSchema()

root
 |-- YEAR: decimal(38,0) (nullable = true)
 |-- STATEABBR: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- TOTAL_POPULATION: decimal(38,0) (nullable = true)
 |-- TOTAL_CRIMES: decimal(38,0) (nullable = true)
 |-- CRUDE_PREVALENCE_OF_MENTAL_HEALTH_CASES: double (nullable = true)
 |-- TOTAL_MENTAL_HEALTH_CASES_REPORTED_BY_BRFSS: double (nullable = true)



### SSMS

In [10]:
import pyspark
from pyspark.sql.session import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("FirstRun") \
    .config("spark.jars.packages", "com.microsoft.azure:spark-mssql-connector_2.12:1.2.0") \
    .config("spark.driver.extraClassPath", "/usr/local/spark/jars/mssql-jdbc-6.1.0.jre8.jar") \
    .getOrCreate()

# Define connection parameters
server_name = "howardserver.database.windows.net"
database_name = "myfirstdatabase"
table_name = "Income_Unemployment_df"
url = f"jdbc:sqlserver://{server_name};databaseName={database_name};"

# Load data from SQL Server
income_unemployment_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", "azureuser") \
    .option("Oi29", "password") \
    .load()
income_unemployment_df = income_unemployment_df.orderBy('Year', 'State_Abbr', ascending=True)


income_unemployment_df.show()

+----+-------------+----------+------+------------------------+--------------------------+-----------------+
|Year|        State|State_Abbr|Income|AVG_Number_of_Employment|AVG_Number_of_Unemployment|Unemployment_Rate|
+----+-------------+----------+------+------------------------+--------------------------+-----------------+
|1984|       Alaska|        AK| 72912|                    null|                      null|             null|
|1984|      Alabama|        AL| 39007|                    null|                      null|             null|
|1984|     Arkansas|        AR| 35320|                    null|                      null|             null|
|1984|      Arizona|        AZ| 48280|                    null|                      null|             null|
|1984|   California|        CA| 56983|                    null|                      null|             null|
|1984|     Colorado|        CO| 58141|                    null|                      null|             null|
|1984|  Connecticut

### MERGING

In [11]:
# Left join those dataset with ufo dataset (main target of this project)
joined_df = ufo_df.alias("ufo").join(
    df_snowflake.alias("snowflake"),
    (col("ufo.year") == col("snowflake.YEAR")) & (col("ufo.state") == col("snowflake.STATEABBR")),
    how='left'
).join(
    pollution_df.alias("pollution"),
    (col("ufo.year") == col("pollution.Year")) & (col("ufo.state") == col("pollution.stateabbr")),
    how='left'
).join(
    political_df.alias("political"),
    (col("ufo.year") == col("political.year")) & (col("ufo.state") == col("political.state")),
    how='left'
).join(
    income_unemployment_df.alias("ssms"),
    (col("ufo.year") == col("ssms.Year")) & (col("ufo.state") == col("ssms.State_Abbr")),
    how='left'
).select(
    col("ufo.year"),
    col("ufo.state"),
    col("ufo.total_UFO_cases_by_state"),
    col("ufo.total_UFO_cases_by_year"),
    col("ufo.duration_in_sec_UFO_cases"),
    col("ufo.total_duration_in_sec_UFO_cases"),
    col("snowflake.TOTAL_CRIMES"),
    col("snowflake.CRUDE_PREVALENCE_OF_MENTAL_HEALTH_CASES"),
    col("snowflake.TOTAL_MENTAL_HEALTH_CASES_REPORTED_BY_BRFSS"),
    col("snowflake.TOTAL_POPULATION"),
    col("pollution.Average_NO2_AQI"),
    col("pollution.Average_O3_AQI"),
    col("pollution.Average_SO2_AQI"),
    col("pollution.Average_CO_AQI"),
    col("political.not voted"),
    col("political.Democrat"),
    col("political.Republican"),
    col("political.Other Party"),
    col("political.total_votes"),
    col("ssms.Income"),
    col("ssms.AVG_Number_of_Employment"),
    col("ssms.AVG_Number_of_Unemployment"),
    col("ssms.Unemployment_Rate")
)
joined_df = joined_df.orderBy('year', 'state', ascending=False)

joined_df.show()


                                                                                

+----+-----+------------------------+-----------------------+-------------------------+-------------------------------+------------+---------------------------------------+-------------------------------------------+----------------+---------------+--------------+---------------+--------------+---------+--------+----------+-----------+-----------+------+------------------------+--------------------------+-----------------+
|year|state|total_UFO_cases_by_state|total_UFO_cases_by_year|duration_in_sec_UFO_cases|total_duration_in_sec_UFO_cases|TOTAL_CRIMES|CRUDE_PREVALENCE_OF_MENTAL_HEALTH_CASES|TOTAL_MENTAL_HEALTH_CASES_REPORTED_BY_BRFSS|TOTAL_POPULATION|Average_NO2_AQI|Average_O3_AQI|Average_SO2_AQI|Average_CO_AQI|not voted|Democrat|Republican|Other Party|total_votes|Income|AVG_Number_of_Employment|AVG_Number_of_Unemployment|Unemployment_Rate|
+----+-----+------------------------+-----------------------+-------------------------+-------------------------------+------------+--------------

In [12]:
joined_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- total_UFO_cases_by_state: long (nullable = false)
 |-- total_UFO_cases_by_year: long (nullable = true)
 |-- duration_in_sec_UFO_cases: integer (nullable = true)
 |-- total_duration_in_sec_UFO_cases: long (nullable = true)
 |-- TOTAL_CRIMES: decimal(38,0) (nullable = true)
 |-- CRUDE_PREVALENCE_OF_MENTAL_HEALTH_CASES: double (nullable = true)
 |-- TOTAL_MENTAL_HEALTH_CASES_REPORTED_BY_BRFSS: double (nullable = true)
 |-- TOTAL_POPULATION: decimal(38,0) (nullable = true)
 |-- Average_NO2_AQI: integer (nullable = true)
 |-- Average_O3_AQI: integer (nullable = true)
 |-- Average_SO2_AQI: integer (nullable = true)
 |-- Average_CO_AQI: integer (nullable = true)
 |-- not voted: long (nullable = true)
 |-- Democrat: long (nullable = true)
 |-- Republican: long (nullable = true)
 |-- Other Party: long (nullable = true)
 |-- total_votes: long (nullable = true)
 |-- Income: integer (nullable = true)
 |-- AVG_Numbe

### ML Decision Tree

In [13]:
# Select features which can affect to ufo cases and duration of seeing ufo
joined_df_2 = joined_df.select(
    col("year"),
    col("state"),
    col("duration_in_sec_UFO_cases"),
    col("total_duration_in_sec_UFO_cases"),
    col("total_UFO_cases_by_state"),
    col("TOTAL_CRIMES"),
    #col("CRUDE_PREVALENCE_OF_MENTAL_HEALTH_CASES"),
    #col("TOTAL_MENTAL_HEALTH_CASES_REPORTED_BY_BRFSS"),
    col("Income"),
    col("AVG_Number_of_Unemployment"),
    col("AVG_Number_of_Employment"),
    col("Unemployment_Rate")
)
joined_df_2 = joined_df_2.orderBy('year', 'state', ascending=True)
joined_df_2 = joined_df_2.dropna()

joined_df_2.show()

                                                                                

+----+-----+-------------------------+-------------------------------+------------------------+------------+------+--------------------------+------------------------+-----------------+
|year|state|duration_in_sec_UFO_cases|total_duration_in_sec_UFO_cases|total_UFO_cases_by_state|TOTAL_CRIMES|Income|AVG_Number_of_Unemployment|AVG_Number_of_Employment|Unemployment_Rate|
+----+-----+-------------------------+-------------------------------+------------------------+------------+------+--------------------------+------------------------+-----------------+
|1990|   AZ|                    14160|                         958172|                      20|       16069| 53309|                  12136.64|               230027.57|             5.01|
|1990|   CA|                    99920|                         958172|                     136|      152971| 60726|                  32644.26|               537452.08|             5.73|
|1990|   CO|                     1760|                         958172|

In [14]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Assemble features into a feature vector
assembler = VectorAssembler(inputCols=['TOTAL_CRIMES', 
                                       #'CRUDE_PREVALENCE_OF_MENTAL_HEALTH_CASES',
                                       #'TOTAL_MENTAL_HEALTH_CASES_REPORTED_BY_BRFSS',
                                       'Income',
                                       'AVG_Number_of_Unemployment',
                                       'AVG_Number_of_Employment',
                                       'Unemployment_Rate'],
                            outputCol='features')
data = assembler.transform(joined_df_2)

# 2014 will be the year to predict the ufo cases and duration of seeing ufo
data_2014 = data.filter(col('year') == 2014)
data_rest = data.filter(col('year') != 2014)

# Define and train the model for the first & second target variable
dt_model_duration = DecisionTreeRegressor(featuresCol="features", labelCol="duration_in_sec_UFO_cases")
dt_model_duration = dt_model_duration.fit(data_rest)

dt_model_total = DecisionTreeRegressor(featuresCol="features", labelCol="total_UFO_cases_by_state")
dt_model_total = dt_model_total.fit(data_rest)

# Make predictions on the testing
predictions_duration = dt_model_duration.transform(data_2014)
predictions_total_cases = dt_model_total.transform(data_2014)

# Evaluate the model's performance
evaluator = RegressionEvaluator(labelCol="duration_in_sec_UFO_cases", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions_duration)

evaluator2 = RegressionEvaluator(labelCol="total_UFO_cases_by_state", predictionCol="prediction", metricName="rmse")
rmse2 = evaluator2.evaluate(predictions_total_cases)


print("Root Mean Squared Error (RMSE) on test data of the duration of UFO sightings in seconds:", rmse)
print("Root Mean Squared Error (RMSE) on test data of total UFO cases:", rmse2)

                                                                                

Root Mean Squared Error (RMSE) on test data of the duration of UFO sightings in seconds: 27547322.24197251
Root Mean Squared Error (RMSE) on test data of total UFO cases: 323.0161182828035


In [15]:
# Selecting the required columns and renaming them

selected_data = predictions_duration.alias("duration").join(
    predictions_total_cases.alias("total_cases"),
    (col("duration.year") == col("total_cases.year")) & (col("duration.state") == col("total_cases.state")),
    how='inner'
).select("duration.year", "duration.state",
         col("total_cases.total_UFO_cases_by_state"),
         col("total_cases.prediction").cast("int").alias("predicted_total_UFO_cases_by_state"),
         col("duration.duration_in_sec_UFO_cases"),
         col("duration.prediction").cast("int").alias("predicted_duration_in_sec_UFO_cases"))
ML_UFO_cases_df = selected_data.orderBy('year', 'state', ascending=True)

print("Predict the total UFO cases and the duration of UFO sightings in seconds based on total crime cases, income of citizens, and unemployment rate in states:")
ML_UFO_cases_df.show()

Predict the total UFO cases and the duration of UFO sightings in seconds based on total crime cases, income of citizens, and unemployment rate in states:


                                                                                

+----+-----+------------------------+----------------------------------+-------------------------+-----------------------------------+
|year|state|total_UFO_cases_by_state|predicted_total_UFO_cases_by_state|duration_in_sec_UFO_cases|predicted_duration_in_sec_UFO_cases|
+----+-----+------------------------+----------------------------------+-------------------------+-----------------------------------+
|2014|   AZ|                     392|                               517|                   288644|                             474152|
|2014|   CA|                    1060|                              2319|                   632200|                            1538484|
|2014|   CO|                     156|                               247|                   130892|                             789940|
|2014|   FL|                     892|                              1664|                   614852|                             474152|
|2014|   GA|                     244|                  

In [23]:
# Change data types of columns
joined_df = joined_df.withColumn("year", joined_df["year"].cast("string"))
joined_df = joined_df.withColumn("state", joined_df["state"].cast("string"))
joined_df = joined_df.withColumn("TOTAL_CRIMES", joined_df["TOTAL_CRIMES"].cast("integer"))
joined_df = joined_df.withColumn("TOTAL_POPULATION", joined_df["TOTAL_POPULATION"].cast("integer"))


joined_df = joined_df.alias("joined").join(
    ML_UFO_cases_df.alias("ML"),
    (col("joined.year") == col("ML.year")) & (col("joined.state") == col("ML.state")),
    how='left'
).select(
    col("joined.year"),
    col("joined.state"),
    col("joined.total_UFO_cases_by_state"),
    col("ML.predicted_total_UFO_cases_by_state"),
    col("joined.total_UFO_cases_by_year"),
    col("joined.duration_in_sec_UFO_cases"),
    col("ML.predicted_duration_in_sec_UFO_cases"),
    col("joined.total_duration_in_sec_UFO_cases"),
    col("joined.TOTAL_CRIMES"),
    col("joined.CRUDE_PREVALENCE_OF_MENTAL_HEALTH_CASES"),
    col("joined.TOTAL_MENTAL_HEALTH_CASES_REPORTED_BY_BRFSS"),
    col("joined.TOTAL_POPULATION"),
    col("joined.Average_NO2_AQI"),
    col("joined.Average_O3_AQI"),
    col("joined.Average_SO2_AQI"),
    col("joined.Average_CO_AQI"),
    col("joined.Income"),
    col("joined.not voted"),
    col("joined.Democrat"),
    col("joined.Republican"),
    col("joined.Other Party"),
    col("joined.total_votes"),
    col("joined.AVG_Number_of_Employment"),
    col("joined.AVG_Number_of_Unemployment"),
    col("joined.Unemployment_Rate")
)
joined_df = joined_df.orderBy('year', 'state', ascending=True)


joined_df.show()


                                                                                

+----+-----+------------------------+----------------------------------+-----------------------+-------------------------+-----------------------------------+-------------------------------+------------+---------------------------------------+-------------------------------------------+----------------+---------------+--------------+---------------+--------------+------+---------+--------+----------+-----------+-----------+------------------------+--------------------------+-----------------+
|year|state|total_UFO_cases_by_state|predicted_total_UFO_cases_by_state|total_UFO_cases_by_year|duration_in_sec_UFO_cases|predicted_duration_in_sec_UFO_cases|total_duration_in_sec_UFO_cases|TOTAL_CRIMES|CRUDE_PREVALENCE_OF_MENTAL_HEALTH_CASES|TOTAL_MENTAL_HEALTH_CASES_REPORTED_BY_BRFSS|TOTAL_POPULATION|Average_NO2_AQI|Average_O3_AQI|Average_SO2_AQI|Average_CO_AQI|Income|not voted|Democrat|Republican|Other Party|total_votes|AVG_Number_of_Employment|AVG_Number_of_Unemployment|Unemployment_Rate|
+---

24/03/06 22:01:27 WARN DAGScheduler: Broadcasting large task binary with size 1280.2 KiB


In [24]:
joined_df.printSchema()

root
 |-- year: string (nullable = true)
 |-- state: string (nullable = true)
 |-- total_UFO_cases_by_state: long (nullable = false)
 |-- predicted_total_UFO_cases_by_state: integer (nullable = true)
 |-- total_UFO_cases_by_year: long (nullable = true)
 |-- duration_in_sec_UFO_cases: integer (nullable = true)
 |-- predicted_duration_in_sec_UFO_cases: integer (nullable = true)
 |-- total_duration_in_sec_UFO_cases: long (nullable = true)
 |-- TOTAL_CRIMES: integer (nullable = true)
 |-- CRUDE_PREVALENCE_OF_MENTAL_HEALTH_CASES: double (nullable = true)
 |-- TOTAL_MENTAL_HEALTH_CASES_REPORTED_BY_BRFSS: double (nullable = true)
 |-- TOTAL_POPULATION: integer (nullable = true)
 |-- Average_NO2_AQI: integer (nullable = true)
 |-- Average_O3_AQI: integer (nullable = true)
 |-- Average_SO2_AQI: integer (nullable = true)
 |-- Average_CO_AQI: integer (nullable = true)
 |-- Income: integer (nullable = true)
 |-- not voted: long (nullable = true)
 |-- Democrat: long (nullable = true)
 |-- Republica

In [28]:
!pip install sqlalchemy
!pip install pyodbc

Defaulting to user installation because normal site-packages is not writeable
Collecting sqlalchemy
  Downloading SQLAlchemy-2.0.28-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.6 kB)
Collecting greenlet!=0.4.17 (from sqlalchemy)
  Downloading greenlet-3.0.3-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (3.8 kB)
Downloading SQLAlchemy-2.0.28-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m38.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading greenlet-3.0.3-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (616 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m616.0/616.0 kB[0m [31m27.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: greenlet, sqlalchemy
Successfully installed greenlet-3.0.3 sqlalchemy-2.0.28
Defaulting to user installation because normal site-packages is not 

### Delivering output to SSMS

In [25]:
import pandas as pd
import sqlalchemy

# Collect data from Spark DF to the driver node
joined_df_collected = joined_df.collect()

# Convert collected data to Pandas DF
pandas_df = pd.DataFrame(joined_df_collected, columns=joined_df.columns)

# Define database connection string using ODBC
connection_string = 'mssql+pyodbc://azureuser:Oi2904@howardserver.database.windows.net/myfirstdatabase?driver=ODBC+Driver+18+for+SQL+Server'

# Create SQLAlchemy engine
engine = sqlalchemy.create_engine(connection_string)

# Connect to the database
connection = engine.connect()

# Define the table name
table_name = 'UFO_integration_df'

# Create a table in the database based on the DF schema
pandas_df.to_sql(table_name, connection, if_exists='replace', index=False)

connection.close()

24/03/06 22:02:15 WARN DAGScheduler: Broadcasting large task binary with size 1247.4 KiB
