In [771]:
import plotly.graph_objects as go
import plotly.io as pio
import pandas as pd
import plotly.express as px
from sklearn import preprocessing
from pyspark.sql.types import DoubleType, IntegerType, StringType,TimestampType

In [772]:
sql_c = SQLContext(sc)

In [773]:
corona_data_schema = StructType([
    StructField("dateRep", TimestampType()),
    StructField("day", IntegerType()),
    StructField("month", IntegerType()),
    StructField("year", IntegerType()),
    StructField("cases", IntegerType()),
    StructField("deaths", IntegerType()),
    StructField("countriesAndTerritories", StringType()),
    StructField("geoId", StringType()),
    StructField("countryterritoryCode", StringType()),
    StructField("popData2018", IntegerType())
])

In [774]:
corona_data = sql_c.read.csv("corona_data.csv",header = True,inferSchema = True)

In [775]:
corona_data = corona_data.select('deaths','cases','geoId','countriesAndTerritories',
                F.unix_timestamp('dateRep', "dd/MM/yyyy")
                .cast(TimestampType()).alias("date"))

In [776]:
sp_data = sql_c.read.csv("s&p.csv",header = True,inferSchema = True)

In [777]:
corona_data.printSchema()

root
 |-- deaths: integer (nullable = true)
 |-- cases: integer (nullable = true)
 |-- geoId: string (nullable = true)
 |-- countriesAndTerritories: string (nullable = true)
 |-- date: timestamp (nullable = true)



In [778]:
corona_data.show()

+------+-----+-----+-----------------------+-------------------+
|deaths|cases|geoId|countriesAndTerritories|               date|
+------+-----+-----+-----------------------+-------------------+
|     1|   51|   AF|            Afghanistan|2020-04-18 00:00:00|
|     4|   10|   AF|            Afghanistan|2020-04-17 00:00:00|
|     2|   70|   AF|            Afghanistan|2020-04-16 00:00:00|
|     2|   49|   AF|            Afghanistan|2020-04-15 00:00:00|
|     3|   58|   AF|            Afghanistan|2020-04-14 00:00:00|
|     0|   52|   AF|            Afghanistan|2020-04-13 00:00:00|
|     3|   34|   AF|            Afghanistan|2020-04-12 00:00:00|
|     0|   37|   AF|            Afghanistan|2020-04-11 00:00:00|
|     1|   61|   AF|            Afghanistan|2020-04-10 00:00:00|
|     3|   56|   AF|            Afghanistan|2020-04-09 00:00:00|
|     4|   30|   AF|            Afghanistan|2020-04-08 00:00:00|
|     0|   38|   AF|            Afghanistan|2020-04-07 00:00:00|
|     2|   29|   AF|     

In [779]:
sp_data.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: long (nullable = true)



In [780]:
sp_data.show()

+-------------------+-----------+-----------+-----------+-----------+-----------+----------+
|               Date|       Open|       High|        Low|      Close|  Adj Close|    Volume|
+-------------------+-----------+-----------+-----------+-----------+-----------+----------+
|2019-04-22 00:00:00|2898.780029| 2909.51001|2896.350098|2907.969971|2907.969971|2997950000|
|2019-04-23 00:00:00| 2909.98999|2936.310059|2908.530029|2933.679932|2933.679932|3635030000|
|2019-04-24 00:00:00|     2934.0|2936.830078|2926.050049|    2927.25|    2927.25|3448960000|
|2019-04-25 00:00:00| 2928.98999|2933.100098|2912.840088|2926.169922|2926.169922|3425280000|
|2019-04-26 00:00:00|2925.810059|2939.879883|2917.560059|2939.879883|2939.879883|3248500000|
|2019-04-29 00:00:00|2940.580078| 2949.52002|2939.350098|2943.030029|2943.030029|3118780000|
|2019-04-30 00:00:00|2937.139893|2948.219971|2924.110107|2945.830078|2945.830078|3919330000|
|2019-05-01 00:00:00|2952.330078|2954.129883|2923.360107| 2923.72998| 

In [781]:
corona_data.createOrReplaceTempView("corona_data")

In [688]:
sp_data.createOrReplaceTempView("sp500")

In [689]:
#lets look at the data of the s&p500 join with the corona by date

In [784]:
sqlDF = spark.sql("SELECT deaths,Close,corona_data.date,cases FROM corona_data"
                  +" join sp500  on"  
                  +" corona_data.date=sp500.date"
                  +" where geoId ='US' AND corona_data.date>'2020-02-28' order by date asc")

In [856]:
sqlDF.show()

+------+-----------+-------------------+-----+
|deaths|      Close|               date|cases|
+------+-----------+-------------------+-----+
|     0|2954.219971|2020-02-28 00:00:00|    1|
|     1| 3090.22998|2020-03-02 00:00:00|   20|
|     4|3003.370117|2020-03-03 00:00:00|   14|
|     3|3130.120117|2020-03-04 00:00:00|   22|
|     2|3023.939941|2020-03-05 00:00:00|   34|
|     1|2972.370117|2020-03-06 00:00:00|   74|
|     4|2746.560059|2020-03-09 00:00:00|  121|
|     5| 2882.22998|2020-03-10 00:00:00|  200|
|     2|2741.379883|2020-03-11 00:00:00|  271|
|     2|2480.639893|2020-03-12 00:00:00|  287|
|    10| 2711.02002|2020-03-13 00:00:00|  351|
|    12|2386.129883|2020-03-16 00:00:00|  823|
|    16|2529.189941|2020-03-17 00:00:00|  887|
|    23|2398.100098|2020-03-18 00:00:00| 1766|
|    42|2409.389893|2020-03-19 00:00:00| 2988|
|     0|2304.919922|2020-03-20 00:00:00| 4835|
|   131|2237.399902|2020-03-23 00:00:00| 8459|
|   119|2447.330078|2020-03-24 00:00:00|11236|
|   211|2475.

In [786]:
fig = go.Figure(go.Scatter(x=sqlDF.toPandas()['date'], y=sqlDF.toPandas()['Close']))
fig.update_layout(title_text='s&p500')
pio.show(fig, filename="spark/sample_rides")

In [808]:
fig = go.Figure(go.Scatter(x=sqlDF.toPandas()['date'], y=sqlDF.toPandas()['deaths'] ,mode='markers'))
fig.update_layout(title_text='Deaths by day')
pio.show(fig, filename="spark/sample_rides")

In [788]:
column_names_to_normalize = ['Close', 'deaths','cases']
x = sqlDF.toPandas()[column_names_to_normalize].values
x_scaled = min_max_scaler.fit_transform(x)
normailze_df = pd.DataFrame(x_scaled, columns=column_names_to_normalize, index = sqlDF.toPandas().index)


In [789]:
date=sqlDF.toPandas()['date']
fig = go.Figure()
fig.update_layout(title_text='S&P500 compared with death and cases of Corona in the united states')
fig.add_trace(go.Scatter(x=date, y=normailze_df['Close'],mode='lines',name='closed S&P500'))
fig.add_trace(go.Scatter(x=date, y=normailze_df['cases'],mode='lines',name='cases of Corona in USA'))
fig.add_trace(go.Scatter(x=date, y=normailze_df['deaths'],mode='lines',name='deaths from Corona in USA'))
pio.show(fig, filename="spark/sample_rides")

In [696]:
# you can see that there is almost no correlation betwenn the S&P500 and the number of deaths and the cases of corona in the 
#in the states

In [809]:
deaths_sum_DF = spark.sql("SELECT date, "
                  +" SUM(deaths) OVER (ORDER BY date) as deaths_sum,"
                  +" cases, deaths"
                  +" FROM corona_data" 
                  +" where geoId ='US' and date>'2020-02-28' order by date asc")


In [810]:
deaths_sum_DF.show()

+-------------------+----------+-----+------+
|               date|deaths_sum|cases|deaths|
+-------------------+----------+-----+------+
|2020-02-28 00:00:00|         0|    1|     0|
|2020-02-29 00:00:00|         0|    6|     0|
|2020-03-01 00:00:00|         1|    3|     1|
|2020-03-02 00:00:00|         2|   20|     1|
|2020-03-03 00:00:00|         6|   14|     4|
|2020-03-04 00:00:00|         9|   22|     3|
|2020-03-05 00:00:00|        11|   34|     2|
|2020-03-06 00:00:00|        12|   74|     1|
|2020-03-07 00:00:00|        14|  105|     2|
|2020-03-08 00:00:00|        17|   95|     3|
|2020-03-09 00:00:00|        21|  121|     4|
|2020-03-10 00:00:00|        26|  200|     5|
|2020-03-11 00:00:00|        28|  271|     2|
|2020-03-12 00:00:00|        30|  287|     2|
|2020-03-13 00:00:00|        40|  351|    10|
|2020-03-14 00:00:00|        47|  511|     7|
|2020-03-15 00:00:00|        57|  777|    10|
|2020-03-16 00:00:00|        69|  823|    12|
|2020-03-17 00:00:00|        85|  

In [795]:
fig = go.Figure(go.Scatter(x=deaths_sum_DF.toPandas()['date'], y=deaths_sum_DF.toPandas()['deaths_sum']))
fig.update_layout(title_text='Deaths Sum by day')
fig = px.bar(x=deaths_sum_DF.toPandas()['date'], y=deaths_sum_DF.toPandas()['deaths_sum'])
pio.show(fig, filename="spark/sample_rides")

In [None]:
# as we can see in the graph above the number of deaths growed exponentially in the united states

In [817]:
fig = go.Figure(data=[
     go.Bar(name='cases', x=deaths_sum_DF.toPandas()['date'], y=deaths_sum_DF.toPandas()['cases']),
    go.Bar(name='deaths', x=deaths_sum_DF.toPandas()['date'], y=deaths_sum_DF.toPandas()['deaths'])
   
])
fig.update_layout(title_text='deaths compare to cases in the united states')
fig.update_layout(barmode='stack')
fig.show()

In [832]:
 countries_df= sql_c.read.csv("average-latitude-longitude-countries.csv",header = True,inferSchema = True)

In [834]:
countries_df.show()

+------------+--------------------+--------+---------+
|country_code|             Country|Latitude|Longitude|
+------------+--------------------+--------+---------+
|          AD|             Andorra|    42.5|      1.5|
|          AE|United Arab Emirates|    24.0|     54.0|
|          AF|         Afghanistan|    33.0|     65.0|
|          AG| Antigua and Barbuda|   17.05|    -61.8|
|          AI|            Anguilla|   18.25|   -63.17|
|          AL|             Albania|    41.0|     20.0|
|          AM|             Armenia|    40.0|     45.0|
|          AN|Netherlands Antilles|   12.25|   -68.75|
|          AO|              Angola|   -12.5|     18.5|
|          AP| Asia/Pacific Region|    35.0|    105.0|
|          AQ|          Antarctica|   -90.0|      0.0|
|          AR|           Argentina|   -34.0|    -64.0|
|          AS|      American Samoa|  -14.33|   -170.0|
|          AT|             Austria|   47.33|    13.33|
|          AU|           Australia|   -27.0|    133.0|
|         

In [835]:
countries_df.createOrReplaceTempView("countries")

In [844]:
deaths_sum_by_contry_DF = spark.sql("SELECT countriesAndTerritories, "
                  +" SUM(cases), first(Latitude) as Latitude, first(Longitude) as Longitude"
                  +" FROM corona_data join countries on country_code=geoId" 
                  +" group by countriesAndTerritories")

In [845]:
deaths_sum_by_contry_DF.show()

+-----------------------+----------+--------+---------+
|countriesAndTerritories|sum(cases)|Latitude|Longitude|
+-----------------------+----------+--------+---------+
|                   Chad|        33|    15.0|     19.0|
|               Anguilla|         3|   18.25|   -63.17|
|               Paraguay|       202|   -23.0|    -58.0|
|                 Russia|     32008|    60.0|    100.0|
|                  Yemen|         1|    15.0|     48.0|
|           Burkina_Faso|       557|    13.0|     -2.0|
|                Senegal|       342|    14.0|    -14.0|
|                 Sweden|     13216|    62.0|     15.0|
|                 Guyana|        63|     5.0|    -59.0|
|                Eritrea|        35|    15.0|     39.0|
|            Philippines|      5878|    13.0|    122.0|
|               Djibouti|       732|    11.5|     43.0|
|               Malaysia|      5251|     2.5|    112.5|
|           Sierra_Leone|        26|     8.5|    -11.5|
|              Singapore|      5050|    1.37|   

In [None]:
#TODO need the table above and print it on a map

In [848]:
from bokeh.transform import linear_cmap
from bokeh.palettes import Plasma256 as palette
from bokeh.models import ColorBar

# we are adding the dataframe as a parameter, 
# since we are now going to plot
# a different dataframe
def plot(df, lat, lng, zoom=10, map_type='roadmap'):
    gmap_options = GMapOptions(lat=lat, lng=lng, 
                               map_type=map_type, zoom=zoom)
    hover = HoverTool(
        tooltips = [
            ('price', '@price euros'),
            # the {0.} means that we don't want decimals
            # for 1 decimal, write {0.0}
            ('price/m2', '@pricem2{0.}'),
            ('building', '@area_build m2'), 
            ('terrain', '@area_tot m2'), 
        ]
    )
    p = gmap(api_key, gmap_options, title='Pays de Gex', 
             width=bokeh_width, height=bokeh_height,
             tools=[hover, 'reset', 'wheel_zoom', 'pan'])
    source = ColumnDataSource(df)
    # defining a color mapper, that will map values of pricem2
    # between 2000 and 8000 on the color palette
    mapper = linear_cmap('pricem2', palette, 2000., 8000.)    
    # we use the mapper for the color of the circles
    center = p.circle('lon', 'lat', radius='radius', alpha=0.6, 
                      color=mapper, source=source)
    # and we add a color scale to see which values the colors 
    # correspond to 
    color_bar = ColorBar(color_mapper=mapper['transform'], 
                         location=(0,0))
    p.add_layout(color_bar, 'right')
    show(p)
    return p

In [861]:
sqlDF.stat.corr( "cases","Close")

-0.07828078235969915

In [862]:
sqlDF.stat.corr( "deaths","Close")

0.14802150813619777

In [None]:
#in the above example we got returned Pearson correlation which is a linear correlation
#we can see that in both correlation the values lies below +- .29, it is said to be a small correlation.