In [1]:
# Big Data Ex1 Avlas Kfir 060519071 - Google Maps API
#----------------------------------------------------

# import needed libraries

from pyspark.sql import SparkSession, DataFrame
import geocoder
import gmaps

In [2]:
API_KEY = "AIzaSyCkZdOzIL_OLM0ms5FP8Loq4kx7pf_vUTk"

In [3]:
# Read csv file using spark sql API

def read_file(p_path: str):
    return spark.read.csv(p_path, inferSchema=True, header=True)

In [4]:
# Query by sql command, create new temp view and return a new dataFrame 

def query_data(p_sql: str, p_view_name: str):
    df = spark.sql(p_sql)
    df.createOrReplaceTempView(p_view_name)
    
    return df

In [5]:
# Create covid19 main dataFrame and initial view

def read_covid19_data():
    data = read_file("/home/spark-vm/PycharmProjects/BigDataCource/covid19.csv")
    data.createOrReplaceTempView("xx_covid19_init_v")
    
    return data

In [6]:
# Read main covid19 data

df1 = read_covid19_data()

In [7]:
df1.take(5)

[Row(dateRep='24-04-20', day=24, month=4, year=2020, cases=105, deaths=2, countriesAndTerritories='Afghanistan', geoId='AF', countryterritoryCode='AFG', popData2018=37172386, continentExp='Asia'),
 Row(dateRep='23-04-20', day=23, month=4, year=2020, cases=84, deaths=4, countriesAndTerritories='Afghanistan', geoId='AF', countryterritoryCode='AFG', popData2018=37172386, continentExp='Asia'),
 Row(dateRep='22-04-20', day=22, month=4, year=2020, cases=61, deaths=1, countriesAndTerritories='Afghanistan', geoId='AF', countryterritoryCode='AFG', popData2018=37172386, continentExp='Asia'),
 Row(dateRep='21-04-20', day=21, month=4, year=2020, cases=35, deaths=2, countriesAndTerritories='Afghanistan', geoId='AF', countryterritoryCode='AFG', popData2018=37172386, continentExp='Asia'),
 Row(dateRep='20-04-20', day=20, month=4, year=2020, cases=88, deaths=3, countriesAndTerritories='Afghanistan', geoId='AF', countryterritoryCode='AFG', popData2018=37172386, continentExp='Asia')]

In [8]:
# Prepare some covid19 dataFrames with manipulation and aggregation
#------------------------------------------------------------------

# 1. Create the basic view with covid19 data

sql1 = (
        "SELECT continentExp as continent, "
                "geoId as country_id, "
                "countriesAndTerritories as country, "
                "to_date(dateRep, 'dd-MM-yy') as date, "
                "day, "
                "month, "
                "year, "
                "cases, "
                "deaths ,"
                "((deaths / cases) * 100) as deaths_percent "
           "FROM xx_covid19_init_v "
          "ORDER BY continent, country_id,date ASC"    )

df1 = query_data(p_sql = sql1, p_view_name = "xx_covid19_v")


In [9]:
# Aggregration by country

sql2 = (
    "SELECT country,"
          " SUM(cases)  as total_cases ,"
          " SUM(deaths) as total_deaths "
             "FROM xx_covid19_v "
           "WHERE cases > 0 "
           "GROUP BY country "
           "ORDER BY country"
)
df2 = query_data(p_sql = sql2, p_view_name = "xx_covid19_country_agr_v")


In [10]:
df2

DataFrame[country: string, total_cases: bigint, total_deaths: bigint]

In [11]:
df2.take(10)

[Row(country='Afghanistan', total_cases=1281, total_deaths=42),
 Row(country='Albania', total_cases=663, total_deaths=27),
 Row(country='Algeria', total_cases=3007, total_deaths=407),
 Row(country='Andorra', total_cases=724, total_deaths=36),
 Row(country='Angola', total_cases=25, total_deaths=2),
 Row(country='Anguilla', total_cases=3, total_deaths=0),
 Row(country='Antigua_and_Barbuda', total_cases=24, total_deaths=2),
 Row(country='Argentina', total_cases=3423, total_deaths=158),
 Row(country='Armenia', total_cases=1596, total_deaths=27),
 Row(country='Aruba', total_cases=100, total_deaths=2)]

In [12]:
# Get Google maps coordinates by country name

def get_coordinates(t):
    
    country = t[0]
    
    gmap = geocoder.google(country, method = "places", key = API_KEY)
    coordinates = gmap.latlng
    c1 = None
    c2 = None
    
    if coordinates is not None:
        c1 = coordinates[0]
        c2 = coordinates[1] 
    
    return (c1 , c2)

In [14]:
# Countries with less than 10000 cases

cases_min = df2.rdd.filter(lambda x: x[1] < 10000)

In [15]:
cases_min = cases_min.map(get_coordinates)

In [16]:
cases_min = cases_min.filter(lambda x: x[0] is not None and x[1] is not None)

In [17]:
cases_min.count()

172

In [19]:
# Countries with more than 10000 cases, less than 30000 cases

cases_med = df2.rdd.filter(lambda x: x[1] >= 10000 and x[1] < 30000)

In [20]:
cases_med = cases_med.map(get_coordinates)

In [21]:
cases_med = cases_med.filter(lambda x: x[0] is not None and x[1] is not None)

In [22]:
cases_med.count()

18

In [23]:
# Countries with more than 30000 cases,

cases_max = df2.rdd.filter(lambda x: x[1] >= 30000)

In [24]:
cases_max = cases_max.map(get_coordinates)

In [25]:
cases_max = cases_max.filter(lambda x: x[0] is not None and x[1] is not None)

In [26]:
cases_max.count()

14

In [27]:
# Moving to Pandas DataFrames for visuallization

minDF = cases_min.toDF().toPandas()
medDF = cases_med.toDF().toPandas()
maxDF = cases_max.toDF().toPandas()

In [30]:
minDF

Unnamed: 0,_1,_2
0,33.939110,67.709953
1,41.153332,20.168331
2,28.033886,1.659626
3,42.506285,1.521801
4,-11.202692,17.873887
...,...,...
167,6.423750,-66.589730
168,14.058324,108.277199
169,15.552727,48.516388
170,-13.133897,27.849332


In [31]:
medDF

Unnamed: 0,_1,_2
0,47.516231,14.550072
1,-35.675147,-71.542969
2,-1.831239,-78.183406
3,20.593684,78.96288
4,53.142367,-7.692054
5,31.046051,34.851612
6,36.204824,138.252924
7,23.634501,-102.552784
8,30.375321,69.345116
9,-9.189967,-75.015152


In [32]:
maxDF

Unnamed: 0,_1,_2
0,50.503887,4.469936
1,-14.235004,-51.92528
2,56.130366,-106.346771
3,35.86166,104.195397
4,46.227638,2.213749
5,51.165691,10.451526
6,32.427908,53.688046
7,41.87194,12.56738
8,52.132633,5.291266
9,61.52401,105.318756


In [28]:
# Configure Google API Key

gmaps.configure(api_key = API_KEY)

In [33]:
# Add symbol layers to Google maps 

min_layer = gmaps.symbol_layer(minDF, fill_color = "green", stroke_color = "green", scale=2)
med_layer = gmaps.symbol_layer(medDF, fill_color = "yellow", stroke_color = "yellow", scale=2)
max_layer = gmaps.symbol_layer(maxDF, fill_color = "red", stroke_color = "red", scale=2)


In [34]:
fig = gmaps.figure()

In [35]:
fig.add_layer(min_layer)
fig.add_layer(med_layer)
fig.add_layer(max_layer)

In [36]:
fig

Figure(layout=FigureLayout(height='420px'))