## Initializations

In [711]:
# Needed to enable %%sparksql magic (Enables SQL instructions execution)
%load_ext sparksql_magic

The sparksql_magic extension is already loaded. To reload it, use:
  %reload_ext sparksql_magic


In [712]:
# Imports
from pyspark.sql import SparkSession

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col,lag, lit, round, mean as _mean, sum as _sum, expr, create_map, to_date, month, dayofmonth, date_format

In [713]:
# Create SparkSession
spark = SparkSession.builder.appName("Meteor Showers") \
    .enableHiveSupport() \
    .getOrCreate()

In [714]:
# Config
cBaseDataPath = "file:///C:/Users/manso/LocalDocuments/10-TechProjects/meteor-showers/data/"

## Create DB structures (according to medallion architecture)

### Bronze Layer

In [715]:
%%sparksql

-- Create dbMeteorShowers Bronze Layer database

-- DROP section (if needed)
--DROP DATABASE dbBMeteorShowers

-- Create database
CREATE DATABASE IF NOT EXISTS dbBMeteorShowers;

### Silver Layer

In [716]:
%%sparksql

-- Create dbMeteorShowers Silver Layer database

-- DROP section (if needed)
--DROP DATABASE dbSMeteorShowers

-- Create database
CREATE DATABASE IF NOT EXISTS dbSMeteorShowers;

### Gold Layer

In [717]:
%%sparksql

-- Create dbMeteorShowers Gold Layer database

-- DROP section (if needed)
--DROP DATABASE dbGMeteorShowers

-- Create database
CREATE DATABASE IF NOT EXISTS dbGMeteorShowers;

In [718]:
# List existing databases
spark.sql('show databases').show()

# List train database tables
tablesdbb = spark.catalog.listTables('dbbmeteorshowers')

print("Table list:")
for table in tablesdbb:
    print(table.name)

+----------------+
|       namespace|
+----------------+
|dbbmeteorshowers|
|dbgmeteorshowers|
|dbsmeteorshowers|
|         default|
+----------------+



Table list:
cities
constellations
meteorshowers
moonphases
AuxCM
stage_moon_phases


# Exercise - Cleanse meteor data

In [719]:
# Import all four .csv files

rawLoadDataEntitiesDict = {
    "meteor_showers": "meteorshowers.csv",
    "moon_phases" : "moonphases.csv",
    "constellations" : "constellations.csv",
    "cities" : "cities.csv"
}

dfmeteor_showers = spark.read.options(inferSchema="True", header= "True").csv(cBaseDataPath + rawLoadDataEntitiesDict["meteor_showers"])
dfmoon_phases = spark.read.options(inferSchema="True", header= "True").csv(cBaseDataPath + rawLoadDataEntitiesDict["moon_phases"])
dfconstellations = spark.read.options(inferSchema="True", header= "True").csv(cBaseDataPath + rawLoadDataEntitiesDict["constellations"])
dfcities = spark.read.options(inferSchema="True", header= "True").csv(cBaseDataPath + rawLoadDataEntitiesDict["cities"])


## Explore data

In [720]:
dfmeteor_showers.toPandas().info()
dfmeteor_showers.show(5)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 9 columns):
 #   Column               Non-Null Count  Dtype 
---  ------               --------------  ----- 
 0   name                 5 non-null      object
 1   radiant              5 non-null      object
 2   bestmonth            5 non-null      object
 3   startmonth           5 non-null      object
 4   startday             5 non-null      int32 
 5   endmonth             5 non-null      object
 6   endday               5 non-null      int32 
 7   hemisphere           5 non-null      object
 8   preferredhemisphere  5 non-null      object
dtypes: int32(2), object(7)
memory usage: 452.0+ bytes
+------------+--------+---------+----------+--------+--------+------+------------------+-------------------+
|        name| radiant|bestmonth|startmonth|startday|endmonth|endday|        hemisphere|preferredhemisphere|
+------------+--------+---------+----------+--------+--------+------+------------------+

In [721]:
#dfmoon_phases.toPandas().info()
dfmoon_phases.show(5)

+-------+---+-------------+------------+
|  month|day|    moonphase|specialevent|
+-------+---+-------------+------------+
|january|  1|         NULL|        NULL|
|january|  2|first quarter|        NULL|
|january|  3|         NULL|        NULL|
|january|  4|         NULL|        NULL|
|january|  5|         NULL|        NULL|
+-------+---+-------------+------------+
only showing top 5 rows



In [722]:
#dfconstellations.toPandas().info()
dfconstellations.show(5)

+-------------+---------+-------------+-----------+-------------------+----------+
|constellation|bestmonth|latitudestart|latitudeend|           besttime|hemisphere|
+-------------+---------+-------------+-----------+-------------------+----------+
|         Lyra|   august|           90|        -40|2024-02-20 21:00:00|  northern|
|     Aquarius|  october|           65|        -90|2024-02-20 21:00:00|  southern|
|        Orion|  january|           85|        -75|2024-02-20 21:00:00|  northern|
|      Perseus| december|           90|        -35|2024-02-20 21:00:00|  northern|
|          Leo|    april|           90|         65|2024-02-20 21:00:00|  northern|
+-------------+---------+-------------+-----------+-------------------+----------+



In [723]:
dfcities.toPandas().info()
dfcities.show(5)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 256 entries, 0 to 255
Data columns (total 3 columns):
 #   Column    Non-Null Count  Dtype  
---  ------    --------------  -----  
 0   city      256 non-null    object 
 1   latitude  256 non-null    float64
 2   country   256 non-null    object 
dtypes: float64(1), object(2)
memory usage: 6.1+ KB
+-----------+--------+--------------------+
|       city|latitude|             country|
+-----------+--------+--------------------+
|  Abu Dhabi|   24.47|United Arab Emirates|
|      Abuja|    9.07|             Nigeria|
|      Accra|    5.55|               Ghana|
|  Adamstown|  -25.07|    Pitcairn Islands|
|Addis Ababa|    9.02|            Ethiopia|
+-----------+--------+--------------------+
only showing top 5 rows



 ## Load Raw data to Bronze Layer

In [724]:
# Write to (managed) tables on dbMeteorShowers database

dfmeteor_showers.write.mode("Overwrite")\
    .saveAsTable("dbBMeteorShowers.MeteorShowers")

dfmoon_phases.write.mode("Overwrite")\
    .saveAsTable("dbBMeteorShowers.MoonPhases")

dfconstellations.write.mode("Overwrite")\
    .saveAsTable("dbBMeteorShowers.Constellations")

dfcities.write.mode("Overwrite")\
    .saveAsTable("dbBMeteorShowers.Cities")


### Explore Bronze layer information

In [725]:
%%sparksql

-- Explore Bronze layer information

select * 
from dbBMeteorShowers.Cities
LIMIT 5
;


0,1,2
city,latitude,country
Abu Dhabi,24.47,United Arab Emirates
Abuja,9.07,Nigeria
Accra,5.55,Ghana
Adamstown,-25.07,Pitcairn Islands
Addis Ababa,9.02,Ethiopia


## Transform data

In [726]:
# Configure value transformation dictionary structures

trfMonths = {'january':1, 'february':2, 'march':3, 'april':4, 'may':5, 'june':6, 'july':7, 'august':8, 'september':9, 'october':10, 'november':11, 'december':12}
trfHemispheres = {'northern':0, 'southern':1, 'northern, southern':3}
trfPhases = {'new moon':0,'third quarter':0.5, 'first quarter':0.5,'full moon':1.0}

In [727]:
# Convert the month columns to numbers:

# Create map structure for months (check List Comprehension on Python)
mapMonths_col = create_map([lit(x) for i in trfMonths.items() for x in i])

dfmeteor_showers = dfmeteor_showers.withColumn("bestmonthnum", mapMonths_col[col('bestmonth')]) \
    .withColumn("startmonthnum", mapMonths_col[col('startmonth')]) \
    .withColumn("endmonthnum", mapMonths_col[col('endmonth')])

#dfmeteor_showers.show()


dfmoon_phases = dfmoon_phases.withColumn("monthnum", mapMonths_col[col('month')])

#dfmoon_phases.show()


dfconstellations = dfconstellations.withColumn("bestmonthnum", mapMonths_col[col('bestmonth')])

#dfconstellations.show()


In [728]:
dfmeteor_showers.toPandas().info()
dfmeteor_showers.show(5)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 12 columns):
 #   Column               Non-Null Count  Dtype 
---  ------               --------------  ----- 
 0   name                 5 non-null      object
 1   radiant              5 non-null      object
 2   bestmonth            5 non-null      object
 3   startmonth           5 non-null      object
 4   startday             5 non-null      int32 
 5   endmonth             5 non-null      object
 6   endday               5 non-null      int32 
 7   hemisphere           5 non-null      object
 8   preferredhemisphere  5 non-null      object
 9   bestmonthnum         5 non-null      int32 
 10  startmonthnum        5 non-null      int32 
 11  endmonthnum          5 non-null      int32 
dtypes: int32(5), object(7)
memory usage: 512.0+ bytes
+------------+--------+---------+----------+--------+--------+------+------------------+-------------------+------------+-------------+-----------+
|        n

In [729]:
# Create two new columns: startdate and enddate. These columns will contain a month and day in 2020:

dfmeteor_showers = dfmeteor_showers.withColumn("startdate", to_date(lit(2020)*lit(10000)+col("startmonthnum")*lit(100)+col("startday"), "yyyyMMdd")) \
                .withColumn("enddate", to_date(lit(2020)*lit(10000)+col("endmonthnum")*lit(100)+col("endday"), "yyyyMMdd")) 

# ... Or with use of SQL in expr() function:
#dfmeteor_showers = dfmeteor_showers.withColumn("startdate", expr("to_date(2020*10000+startmonthnum*100+startday,'yyyyMMdd')"))

dfmeteor_showers.printSchema()
dfmeteor_showers.show(5)


root
 |-- name: string (nullable = true)
 |-- radiant: string (nullable = true)
 |-- bestmonth: string (nullable = true)
 |-- startmonth: string (nullable = true)
 |-- startday: integer (nullable = true)
 |-- endmonth: string (nullable = true)
 |-- endday: integer (nullable = true)
 |-- hemisphere: string (nullable = true)
 |-- preferredhemisphere: string (nullable = true)
 |-- bestmonthnum: integer (nullable = true)
 |-- startmonthnum: integer (nullable = true)
 |-- endmonthnum: integer (nullable = true)
 |-- startdate: date (nullable = true)
 |-- enddate: date (nullable = true)

+------------+--------+---------+----------+--------+--------+------+------------------+-------------------+------------+-------------+-----------+----------+----------+
|        name| radiant|bestmonth|startmonth|startday|endmonth|endday|        hemisphere|preferredhemisphere|bestmonthnum|startmonthnum|endmonthnum| startdate|   enddate|
+------------+--------+---------+----------+--------+--------+------+---

In [730]:
# Follow the same pattern for moon_phases:

dfmoon_phases = dfmoon_phases.withColumn("date", to_date(lit(2020)*lit(10000)+col("monthnum")*lit(100)+col("day"), "yyyyMMdd"))

dfmoon_phases.printSchema()
dfmoon_phases.show()


root
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- moonphase: string (nullable = true)
 |-- specialevent: string (nullable = true)
 |-- monthnum: integer (nullable = true)
 |-- date: date (nullable = true)

+-------+---+-------------+------------+--------+----------+
|  month|day|    moonphase|specialevent|monthnum|      date|
+-------+---+-------------+------------+--------+----------+
|january|  1|         NULL|        NULL|       1|2020-01-01|
|january|  2|first quarter|        NULL|       1|2020-01-02|
|january|  3|         NULL|        NULL|       1|2020-01-03|
|january|  4|         NULL|        NULL|       1|2020-01-04|
|january|  5|         NULL|        NULL|       1|2020-01-05|
|january|  6|         NULL|        NULL|       1|2020-01-06|
|january|  7|         NULL|        NULL|       1|2020-01-07|
|january|  8|         NULL|        NULL|       1|2020-01-08|
|january|  9|         NULL|        NULL|       1|2020-01-09|
|january| 10|    full moon|  

In [731]:
# Convert hemisphere data to numbers by using the mapping process:

# Create map structure for Hemispheres (check List Comprehension on Python)
mapHemispheres_col = create_map([lit(x) for i in trfHemispheres.items() for x in i])

dfmeteor_showers = dfmeteor_showers.withColumn("hemispherenum", mapHemispheres_col[col('hemisphere')])

dfconstellations = dfconstellations.withColumn("hemispherenum", mapHemispheres_col[col('hemisphere')])

dfmeteor_showers.show(5)
dfconstellations.show(5)


""" hemispheres = {'northern':0, 'southern':1, 'northern, southern':3}
meteor_showers.hemisphere = meteor_showers.hemisphere.map(hemispheres)
constellations.hemisphere = constellations.hemisphere.map(hemispheres) """

+------------+--------+---------+----------+--------+--------+------+------------------+-------------------+------------+-------------+-----------+----------+----------+-------------+
|        name| radiant|bestmonth|startmonth|startday|endmonth|endday|        hemisphere|preferredhemisphere|bestmonthnum|startmonthnum|endmonthnum| startdate|   enddate|hemispherenum|
+------------+--------+---------+----------+--------+--------+------+------------------+-------------------+------------+-------------+-----------+----------+----------+-------------+
|      Lyrids|    Lyra|    april|     april|      21|   april|    22|          northern|           northern|           4|            4|          4|2020-04-21|2020-04-22|            0|
|Eta Aquarids|Aquarius|      may|     april|      19|     may|    28|northern, southern|           southern|           5|            4|          5|2020-04-19|2020-05-28|            3|
|    Orionids|   Orion|  october|   october|       2|november|     7|northern, s

" hemispheres = {'northern':0, 'southern':1, 'northern, southern':3}\nmeteor_showers.hemisphere = meteor_showers.hemisphere.map(hemispheres)\nconstellations.hemisphere = constellations.hemisphere.map(hemispheres) "

In [732]:
# Convert Moon phases to numbers that represent the percentage of the Moon that's visible

# Create map structure for Moon phases (check List Comprehension on Python)
mapPhases_col = create_map([lit(x) for i in trfPhases.items() for x in i])

dfmoon_phases = dfmoon_phases.withColumn("percentage", mapPhases_col[col('moonphase')])

dfmoon_phases.show()


+-------+---+-------------+------------+--------+----------+----------+
|  month|day|    moonphase|specialevent|monthnum|      date|percentage|
+-------+---+-------------+------------+--------+----------+----------+
|january|  1|         NULL|        NULL|       1|2020-01-01|      NULL|
|january|  2|first quarter|        NULL|       1|2020-01-02|       0.5|
|january|  3|         NULL|        NULL|       1|2020-01-03|      NULL|
|january|  4|         NULL|        NULL|       1|2020-01-04|      NULL|
|january|  5|         NULL|        NULL|       1|2020-01-05|      NULL|
|january|  6|         NULL|        NULL|       1|2020-01-06|      NULL|
|january|  7|         NULL|        NULL|       1|2020-01-07|      NULL|
|january|  8|         NULL|        NULL|       1|2020-01-08|      NULL|
|january|  9|         NULL|        NULL|       1|2020-01-09|      NULL|
|january| 10|    full moon|        NULL|       1|2020-01-10|       1.0|
|january| 11|         NULL|        NULL|       1|2020-01-11|    

In [733]:
# Remove unnecessary data
dfmeteor_showers = dfmeteor_showers.drop("startmonth", "startday", "endmonth", "endday", "hemisphere")
dfmoon_phases = dfmoon_phases.drop("month","day","moonphase","specialevent")
dfconstellations = dfconstellations.drop("besttime")

dfmeteor_showers.show(5)
dfmoon_phases.show(5)
dfconstellations.show(5)

+------------+--------+---------+-------------------+------------+-------------+-----------+----------+----------+-------------+
|        name| radiant|bestmonth|preferredhemisphere|bestmonthnum|startmonthnum|endmonthnum| startdate|   enddate|hemispherenum|
+------------+--------+---------+-------------------+------------+-------------+-----------+----------+----------+-------------+
|      Lyrids|    Lyra|    april|           northern|           4|            4|          4|2020-04-21|2020-04-22|            0|
|Eta Aquarids|Aquarius|      may|           southern|           5|            4|          5|2020-04-19|2020-05-28|            3|
|    Orionids|   Orion|  october| northern, southern|          10|           10|         11|2020-10-02|2020-11-07|            3|
|    Perseids| Perseus|   august|           northern|           8|            7|          8|2020-07-14|2020-08-24|            0|
|     Leonids|     Leo| november| northern, southern|          11|           11|         11|2020-

In [734]:
# Figuring out a more accurate percentage for moon_phases:

# 1.Create a variable to save the last phase that you saw.
# 2.Loop through each row and column in the moon_phases DataFrame.
# 3.If the value in the percentage column of a row is NaN (null), then replace it with the last phase that you saw.
# 4.If the value isn't NaN, then save the value as the last phase that you saw.


#dfmoon_phases.show(5)

# Generate ID integer key column from date
dfmoon_phases = dfmoon_phases.withColumn("id",2020*10000+month("date")*100+dayofmonth("date")) 

dfmoon_phases.createOrReplaceTempView("stage_moon_phases")
dfmoon_phases = spark.sql( \
    " With univ as ( \
    select \
        id as id_Aux, \
        LAG(id, 1,0) OVER (ORDER BY id) AS id_From, \
        (id - 1) as id_TO, \
        LAG(percentage, 1,0) OVER (ORDER BY id) AS PreviousPct \
    from stage_moon_phases \
    where ifnull(percentage,-1) > 0 \
    and  1=1) \
    Select \
        id_Aux, \
        CASE \
            WHEN id_From = 0 then to_date(id_TO,'yyyyMMdd') \
            ELSE to_date(id_From,'yyyyMMdd') \
        END as id_From, \
        to_date(id_TO,'yyyyMMdd') as id_TO, \
        PreviousPct as percentage \
    from univ" \
     )

#dfmoon_phases.show(5)

dfmoon_phases = dfmoon_phases.withColumn('yr_seq', expr("sequence(id_From, id_TO, interval 1 day)")) \
    .withColumn("date",expr("explode(yr_seq)")) \
    .withColumn("DateID",date_format(col("date"),"yyyyMMdd").cast(IntegerType())) \
    .withColumn("monthnum",month(col("date"))) \
    .orderBy("DateID") \
    .drop("yr_seq","id_Aux","id_From","id_TO")

dfmoon_phases.printSchema()
dfmoon_phases.show()


root
 |-- percentage: double (nullable = true)
 |-- date: date (nullable = false)
 |-- DateID: integer (nullable = true)
 |-- monthnum: integer (nullable = false)



+----------+----------+--------+--------+
|percentage|      date|  DateID|monthnum|
+----------+----------+--------+--------+
|       0.0|2020-01-01|20200101|       1|
|       0.5|2020-01-02|20200102|       1|
|       0.5|2020-01-03|20200103|       1|
|       0.5|2020-01-04|20200104|       1|
|       0.5|2020-01-05|20200105|       1|
|       0.5|2020-01-06|20200106|       1|
|       0.5|2020-01-07|20200107|       1|
|       0.5|2020-01-08|20200108|       1|
|       0.5|2020-01-09|20200109|       1|
|       1.0|2020-01-10|20200110|       1|
|       1.0|2020-01-11|20200111|       1|
|       1.0|2020-01-12|20200112|       1|
|       1.0|2020-01-13|20200113|       1|
|       1.0|2020-01-14|20200114|       1|
|       1.0|2020-01-15|20200115|       1|
|       1.0|2020-01-16|20200116|       1|
|       0.5|2020-02-01|20200201|       2|
|       0.5|2020-02-02|20200202|       2|
|       0.5|2020-02-03|20200203|       2|
|       0.5|2020-02-04|20200204|       2|
+----------+----------+--------+--

 ## Load Tranformed data to Silver Layer

In [735]:
# Write to (managed) tables on dbMeteorShowers database

dfmoon_phases.write.mode("Overwrite")\
    .saveAsTable("dbsMeteorShowers.MoonPhases")


### Explore Silver layer information

In [736]:
%%sparksql

-- Explore Bronze layer information

select * 
from dbsMeteorShowers.MoonPhases
where DateID >= 20200116
LIMIT 20
;


0,1,2,3
percentage,date,DateID,monthnum
1.0,2020-01-16,20200116,1
0.5,2020-02-01,20200201,2
0.5,2020-02-02,20200202,2
0.5,2020-02-03,20200203,2
0.5,2020-02-04,20200204,2
0.5,2020-02-05,20200205,2
0.5,2020-02-06,20200206,2
0.5,2020-02-07,20200207,2
1.0,2020-02-08,20200208,2


# Exercise - Write a predictor function - Part 1

### Review our four datasets

In [738]:
dfmeteor_showers.printSchema()

root
 |-- name: string (nullable = true)
 |-- radiant: string (nullable = true)
 |-- bestmonth: string (nullable = true)
 |-- preferredhemisphere: string (nullable = true)
 |-- bestmonthnum: integer (nullable = true)
 |-- startmonthnum: integer (nullable = true)
 |-- endmonthnum: integer (nullable = true)
 |-- startdate: date (nullable = true)
 |-- enddate: date (nullable = true)
 |-- hemispherenum: integer (nullable = true)



In [739]:
dfmoon_phases.printSchema()

root
 |-- percentage: double (nullable = true)
 |-- date: date (nullable = false)
 |-- DateID: integer (nullable = true)
 |-- monthnum: integer (nullable = false)



In [741]:
dfcities.printSchema()

root
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- country: string (nullable = true)



In [742]:
dfconstellations.printSchema()

root
 |-- constellation: string (nullable = true)
 |-- bestmonth: string (nullable = true)
 |-- latitudestart: integer (nullable = true)
 |-- latitudeend: integer (nullable = true)
 |-- hemisphere: string (nullable = true)
 |-- bestmonthnum: integer (nullable = true)
 |-- hemispherenum: integer (nullable = true)



## Determine the latitude

In [753]:
# Create a function called predict_best_meteor_shower_viewing that takes in a city as a parameter:

def predict_best_meteor_shower_viewing(city):
    # Get the latitude of the city from the cities DataFrame
    latitude = dfcities.select("latitude").where(col("city") == city).collect()[0][0]

    return latitude


In [754]:
# Call the function

print(predict_best_meteor_shower_viewing('Abu Dhabi'))

24.47
