# Summary
- Group 0 (Diane Woodbridge, Diane Woodrbige2, Diane Woodrbidge3)

- Data : Bing Corona Virus Search Data (https://github.com/microsoft/BingCoronavirusQuerySet)

- Cluster Setting : Multi Node, No Isolation Shared, Runtime : 11.0, Worker/Driver type : e2-highmem-2 (Min workers : 1, Max workers : 2 with autoscaling)

- To improve efficiency, configure to have spark.executor.memory and spark.driver.memory being 2g and .cache() intermediate rdd which being used often.

- Objectives : Using Bing Search Data relavant to Corona Virus in 2021 from various countries, see the search trends and plot using line/scatter/bubble plots.

> 1) Visualize trends of overall searches/searches by countries using line and scatter plots

> 2) Visualize keywords with most searches on a map.

In [0]:
from datetime import datetime

from geopy.geocoders import Nominatim
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import pyspark



# Check deafult configurations

In [0]:
sc._conf.getAll()

Out[2]: [('spark.databricks.preemption.enabled', 'true'),
 ('spark.sql.hive.metastore.jars', '/databricks/databricks-hive/*'),
 ('spark.driver.tempDirectory', '/local_disk0/tmp'),
 ('spark.databricks.clusterUsageTags.onInstancePool', 'false'),
 ('spark.sql.warehouse.dir', 'dbfs:/user/hive/warehouse'),
 ('spark.databricks.managedCatalog.clientClassName',
  'com.databricks.managedcatalog.ManagedCatalogClientImpl'),
 ('spark.hadoop.fs.gs.impl',
  'shaded.databricks.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem'),
 ('spark.databricks.clusterUsageTags.clusterTargetWorkers', '1'),
 ('spark.hadoop.fs.fcfs-s3.impl.disable.cache', 'true'),
 ('spark.hadoop.fs.s3a.retry.limit', '20'),
 ('spark.sql.streaming.checkpointFileManagerClass',
  'com.databricks.spark.sql.streaming.DatabricksCheckpointFileManager'),
 ('spark.databricks.service.dbutils.repl.backend',
  'com.databricks.dbconnect.ReplDBUtils'),
 ('spark.hadoop.databricks.s3.verifyBucketExists.enabled', 'false'),
 ('spark.streaming.dr

# Set Configurations
Available parameters : https://spark.apache.org/docs/latest/configuration.html

In [0]:
sc._conf.get('spark.executor.memory')

Out[2]: '4g'

In [0]:
bing_inputs = sc.wholeTextFiles("gs://usf-msds694-data/Bing_Searches/*")

In [0]:
bing_inputs.collect()

Out[5]: [('gs://usf-msds694-data/Bing_Searches/QueriesByState_2021-06-01_2021-06-30.tsv',
  "2021-06-01\tcoronavirus statistiques\tFalse\tIle-De-France\tFrance\t1\n2021-06-01\tamelipro\tTrue\tNormandy\tFrance\t6\n2021-06-01\tnh unemployment login\tTrue\tNew Hampshire\tUnited States\t75\n2021-06-01\tlloyds online banking\tTrue\tDarlington\tUnited Kingdom\t50\n2021-06-01\tcovid eligibility checker\tFalse\tVictoria\tAustralia\t1\n2021-06-01\tvaccine\tTrue\tCalifornia\tUnited States\t12\n2021-06-01\tcovid vaccination booking\tFalse\tLondon\tUnited Kingdom\t1\n2021-06-01\tcoronavirus tracker johns hopkins csse\tFalse\tCalifornia\tUnited States\t1\n2021-06-01\tlegacy go health\tTrue\tOregon\tUnited States\t2\n2021-06-01\tnyheter idag\tTrue\tVarmland County\tSweden\t1\n2021-06-01\tflamengo\tTrue\tParana\tBrazil\t11\n2021-06-01\tuchealth myuchealth colorado\tTrue\tColorado\tUnited States\t48\n2021-06-01\tvacuna covid\tFalse\tDistrito Federal\tMexico\t4\n2021-06-01\tcoronavirus monde\tFalse\tIl

In [0]:
input = bing_inputs.map(lambda x: x[1]).flatMap(lambda x : x.split("\n"))

In [0]:
def retrieve_column(array, col):
    try:
        return array[col]
    except:
        pass

    
def convert_string_to_timestamp(string):
    try:
        return datetime.strptime(string, '%Y-%M-%d').date()
    except:
        pass


def string_to_int(x):
    try:
        return int(x)
    except:
        return 0

In [0]:
parsed_input = input.map(lambda x : x.split("\t"))\
                    .map(lambda x : [convert_string_to_timestamp(retrieve_column(x, 0)),
                                     retrieve_column(x, 1), 
                                     retrieve_column(x, 2),
                                     retrieve_column(x, 3),
                                     retrieve_column(x, 4), 
                                     string_to_int(retrieve_column(x,5))])\
                    .cache()

In [0]:
parsed_input.take(10)

Out[9]: [[datetime.date(2021, 1, 1),
  'coronavirus statistiques',
  'False',
  'Ile-De-France',
  'France',
  1],
 [datetime.date(2021, 1, 1), 'amelipro', 'True', 'Normandy', 'France', 6],
 [datetime.date(2021, 1, 1),
  'nh unemployment login',
  'True',
  'New Hampshire',
  'United States',
  75],
 [datetime.date(2021, 1, 1),
  'lloyds online banking',
  'True',
  'Darlington',
  'United Kingdom',
  50],
 [datetime.date(2021, 1, 1),
  'covid eligibility checker',
  'False',
  'Victoria',
  'Australia',
  1],
 [datetime.date(2021, 1, 1),
  'vaccine',
  'True',
  'California',
  'United States',
  12],
 [datetime.date(2021, 1, 1),
  'covid vaccination booking',
  'False',
  'London',
  'United Kingdom',
  1],
 [datetime.date(2021, 1, 1),
  'coronavirus tracker johns hopkins csse',
  'False',
  'California',
  'United States',
  1],
 [datetime.date(2021, 1, 1),
  'legacy go health',
  'True',
  'Oregon',
  'United States',
  2],
 [datetime.date(2021, 1, 1),
  'nyheter idag',
  'True',
 

# Display Daily Count (Diane Woodbridge)

Create a line chart showing daily counts

It shows there are patterns in 7 days, where Thursday has the highest value.

In [0]:
daily_count = parsed_input.map(lambda x : [x[0],x[5]])\
                          .reduceByKey(lambda x,y : x+y)\
                          .collect()

daily_count_df = pd.DataFrame(daily_count, columns=['date', 'count'])
fig = px.line(daily_count_df, x='date', y="count")
fig.show()

# Display Daily Count Per Country (Diane Woodbridge 2)
For US, France, UK, Australia and Taiwan, create a scatter plot showing daily counts.

Observation : It shows there are patterns in 7 days, where Thursday/Friday has the highest value. UK has the strongest market among all.

In [0]:
country_daily_count = parsed_input.map(lambda x : ((x[4], x[0]), x[5]))\
                                  .reduceByKey(lambda x, y : x+y)\
                                  .map(lambda x : [x[0][0], x[0][1], x[1]]).collect()

country_count_df = pd.DataFrame(country_daily_count, columns=['country', 'date', 'count'])
fig = px.scatter(country_count_df.query('country=="United States" or country== "France" or country=="United Kingdom" or country=="Australia" or country=="Taiwan"'), 
                 x="date", 
                 y="count",
                 color="country")
fig.show()


# Display a map plot (Diane Woodbridge)
Create a bubble map showing top 200 (country, word) pairs. 

Observation :  Overall there are many topics relevant to the pandemic, but shows differences depending on its own political/cultural issues such as Lloyds Online Banking in UK, costco online in the US.

In [0]:

geolocator = Nominatim(user_agent="user_name")

def return_with_lon_lat(vals):
    lon_lat = list()
    for val in vals:
        loc = geolocator.geocode(val[0])
        lon_lat.append([loc.longitude, loc.latitude])
    return np.append(np.array(vals), np.array(lon_lat), 1)
    
    
country_word_freq = parsed_input.map(lambda x : ((x[4], x[1]), x[-1]))\
                                .groupByKey()\
                                .mapValues(lambda x : sum(x))\
                                .map(lambda val : [val[0][0], val[0][1], val[1]])\
                                .sortBy(lambda x : x[2], ascending = False)\
                                .take(200)


In [0]:
limits = [(0,50),(50,100),(100,150),(150,200)]
colors = ["royalblue","crimson","lightseagreen","orange","lightgrey"]

fig = go.Figure(go.Scattergeo())
fig.update_layout(height=1000)

country_word_freq_df = pd.DataFrame(return_with_lon_lat(country_word_freq), columns=['country', 'keyword', 'count', 'lon', 'lat'])

for i in range(len(limits)):
    lim = limits[i]
    df_sub = country_word_freq_df[lim[0]:lim[1]]
    fig.add_trace(go.Scattergeo(lon = df_sub['lon'],
                                lat = df_sub['lat'],
                                text = df_sub['keyword'] + '<br>' + df_sub['country'] + '<br>' + df_sub['count'],
                                marker = dict(
                                    color = colors[i],
                                    size = df_sub['count'].astype('int32')/10,
                                    line_width=0.5,
                                    sizemode = 'area'
                            ),
                             name = 'Top {0} - {1}'.format(lim[0],lim[1])))


In [0]:
fig.show()