## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [2]:
# File location and type
file_location = "/FileStore/tables/corona_dataset_latest.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
corona_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(corona_df)

_c0,State,Country,Lat,Long,Date,Confirmed,Death,Recovered,state_cleaned,City
0,,Thailand,15.0,101.0,2020-01-22T00:00:00.000+0000,2,0,0,Bangkok,
1,,Japan,36.0,138.0,2020-01-22T00:00:00.000+0000,2,0,0,Hiraide,
2,,Singapore,1.2833,103.8333,2020-01-22T00:00:00.000+0000,0,0,0,Singapore,
3,,Nepal,28.1667,84.25,2020-01-22T00:00:00.000+0000,0,0,0,Kathmandu,
4,,Malaysia,2.5,112.5,2020-01-22T00:00:00.000+0000,0,0,0,Sarawak,
5,British Columbia,Canada,49.2827,-123.1207,2020-01-22T00:00:00.000+0000,0,0,0,British Columbia,
6,New South Wales,Australia,-33.8688,151.2093,2020-01-22T00:00:00.000+0000,0,0,0,New South Wales,
7,Victoria,Australia,-37.8136,144.9631,2020-01-22T00:00:00.000+0000,0,0,0,Victoria,
8,Queensland,Australia,-28.0167,153.4,2020-01-22T00:00:00.000+0000,0,0,0,Queensland,
9,,Cambodia,11.55,104.9167,2020-01-22T00:00:00.000+0000,0,0,0,Phnom Penh,


In [3]:
# File location and type
file_location = "/FileStore/tables/tweets.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
twitter_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(twitter_df)

_c0,geo,text,user,location,entities,sentiment,country
0,,What is God saying to us about #coronavirus ?,petodinice,Lagos,"[('about #', 'CARDINAL')]","{'neg': 0.0, 'neu': 0.769, 'pos': 0.231, 'compound': 0.2732}",Nigeria
1,,"""BREAKING: """"this is disappointing","but i took the test"""". Arsenal's Mikel Arteta tests positive for #coronavirus","-… """,JerryfranksonJF,"Abuja, Nigeria","""[(""""Arsenal's Mikel Arteta"""""
2,,#Coronavirus testing must be made free to the public if we are going to understand the scope of this crisis. Anything le…,cek422,"Pennsylvania, USA",[],"{'neg': 0.173, 'neu': 0.71, 'pos': 0.117, 'compound': -0.3767}",USA
3,,Get ready for mass event crowd cancellations across the World starting this weekend: cricket in #Australia in empty st…,InfectiousDz,NYC,"[('World', 'ORG'), ('this weekend', 'DATE'), '']","{'neg': 0.085, 'neu': 0.798, 'pos': 0.117, 'compound': 0.1779}",USA
4,,The #coronavirus pandemic is revealing just how closely we are all bound together...[A thread],vic_gibson,,[''],"{'neg': 0.0, 'neu': 1.0, 'pos': 0.0, 'compound': 0.0}",
5,,"COVID-19 update as of this morning:1. Death toll in Italy passes 1,0002. Arsenal's head coach Arteta tests positive3. US…",StewartNgilana,Durban | Port Elizabeth,"[('Italy', 'GPE'), ('1,0002', 'CARDINAL'), ('Arsenal', 'ORG'), ('Arteta', 'ORG'), ('US', 'GPE')]","{'neg': 0.178, 'neu': 0.822, 'pos': 0.0, 'compound': -0.5994}",USA
6,,"It’s painful to say, but as an ER doc who talked to about healthcare, I believe he's uniquely unqualified to…",BWheatnyc,Florida,[],"{'neg': 0.098, 'neu': 0.902, 'pos': 0.0, 'compound': -0.2382}",USA
7,,"📽️Friends, I will drop another video on Twitter tonight around 7:30pm to answer your questions about #coronavirus.Plea…",LorseaR,"New South Wales, Australia","[('Twitter', 'GPE'), ('tonight', 'TIME')]","{'neg': 0.123, 'neu': 0.877, 'pos': 0.0, 'compound': -0.2732}",Australia
8,,"Questions about COVID-19? See this resource and info page from NAfME for music educators, including music instrument hygiene: ht…",straightj23,"Columbus, OH","[('NAfME', 'CARDINAL')]","{'neg': 0.0, 'neu': 1.0, 'pos': 0.0, 'compound': 0.0}",USA
9,,"How they’re dealing with #coronavirus in a public facility in Portsmouth, Uk. Yes, that’s a coronavirus warning sig…",_______coolio,"South East, England","['', ('Portsmouth', 'GPE'), ('Uk', 'GPE')]","{'neg': 0.126, 'neu': 0.733, 'pos': 0.141, 'compound': 0.0772}",UK


In [4]:
temp_table_name = "corona"

corona_df.createOrReplaceTempView(temp_table_name)

In [5]:
%sql
select * from corona

_c0,State,Country,Lat,Long,Date,Confirmed,Death,Recovered,state_cleaned,City
0,,Thailand,15.0,101.0,2020-01-22T00:00:00.000+0000,2,0,0,Bangkok,
1,,Japan,36.0,138.0,2020-01-22T00:00:00.000+0000,2,0,0,Hiraide,
2,,Singapore,1.2833,103.8333,2020-01-22T00:00:00.000+0000,0,0,0,Singapore,
3,,Nepal,28.1667,84.25,2020-01-22T00:00:00.000+0000,0,0,0,Kathmandu,
4,,Malaysia,2.5,112.5,2020-01-22T00:00:00.000+0000,0,0,0,Sarawak,
5,British Columbia,Canada,49.2827,-123.1207,2020-01-22T00:00:00.000+0000,0,0,0,British Columbia,
6,New South Wales,Australia,-33.8688,151.2093,2020-01-22T00:00:00.000+0000,0,0,0,New South Wales,
7,Victoria,Australia,-37.8136,144.9631,2020-01-22T00:00:00.000+0000,0,0,0,Victoria,
8,Queensland,Australia,-28.0167,153.4,2020-01-22T00:00:00.000+0000,0,0,0,Queensland,
9,,Cambodia,11.55,104.9167,2020-01-22T00:00:00.000+0000,0,0,0,Phnom Penh,


In [6]:
temp_table_name = "twitter"

twitter_df.createOrReplaceTempView(temp_table_name)

In [7]:
%sql
select * from twitter

_c0,geo,text,user,location,entities,sentiment,country
0,,What is God saying to us about #coronavirus ?,petodinice,Lagos,"[('about #', 'CARDINAL')]","{'neg': 0.0, 'neu': 0.769, 'pos': 0.231, 'compound': 0.2732}",Nigeria
1,,"""BREAKING: """"this is disappointing","but i took the test"""". Arsenal's Mikel Arteta tests positive for #coronavirus","-… """,JerryfranksonJF,"Abuja, Nigeria","""[(""""Arsenal's Mikel Arteta"""""
2,,#Coronavirus testing must be made free to the public if we are going to understand the scope of this crisis. Anything le…,cek422,"Pennsylvania, USA",[],"{'neg': 0.173, 'neu': 0.71, 'pos': 0.117, 'compound': -0.3767}",USA
3,,Get ready for mass event crowd cancellations across the World starting this weekend: cricket in #Australia in empty st…,InfectiousDz,NYC,"[('World', 'ORG'), ('this weekend', 'DATE'), '']","{'neg': 0.085, 'neu': 0.798, 'pos': 0.117, 'compound': 0.1779}",USA
4,,The #coronavirus pandemic is revealing just how closely we are all bound together...[A thread],vic_gibson,,[''],"{'neg': 0.0, 'neu': 1.0, 'pos': 0.0, 'compound': 0.0}",
5,,"COVID-19 update as of this morning:1. Death toll in Italy passes 1,0002. Arsenal's head coach Arteta tests positive3. US…",StewartNgilana,Durban | Port Elizabeth,"[('Italy', 'GPE'), ('1,0002', 'CARDINAL'), ('Arsenal', 'ORG'), ('Arteta', 'ORG'), ('US', 'GPE')]","{'neg': 0.178, 'neu': 0.822, 'pos': 0.0, 'compound': -0.5994}",USA
6,,"It’s painful to say, but as an ER doc who talked to about healthcare, I believe he's uniquely unqualified to…",BWheatnyc,Florida,[],"{'neg': 0.098, 'neu': 0.902, 'pos': 0.0, 'compound': -0.2382}",USA
7,,"📽️Friends, I will drop another video on Twitter tonight around 7:30pm to answer your questions about #coronavirus.Plea…",LorseaR,"New South Wales, Australia","[('Twitter', 'GPE'), ('tonight', 'TIME')]","{'neg': 0.123, 'neu': 0.877, 'pos': 0.0, 'compound': -0.2732}",Australia
8,,"Questions about COVID-19? See this resource and info page from NAfME for music educators, including music instrument hygiene: ht…",straightj23,"Columbus, OH","[('NAfME', 'CARDINAL')]","{'neg': 0.0, 'neu': 1.0, 'pos': 0.0, 'compound': 0.0}",USA
9,,"How they’re dealing with #coronavirus in a public facility in Portsmouth, Uk. Yes, that’s a coronavirus warning sig…",_______coolio,"South East, England","['', ('Portsmouth', 'GPE'), ('Uk', 'GPE')]","{'neg': 0.126, 'neu': 0.733, 'pos': 0.141, 'compound': 0.0772}",UK


In [8]:
from pyspark.sql.functions import *

In [9]:
corona_df.show()

In [10]:
corona_df.count()

In [11]:
twitter_df.show()

In [12]:
twitter_df.count()

In [13]:
twitter_df.filter("country='USA'")

In [14]:
twitter_df.filter("country='USA'").show()

In [15]:
twitter_df.filter("country='USA' and location like 'New%'").show()

In [16]:
tw_filter_df = twitter_df.filter("country = 'USA'")

In [17]:
tw_filter_df.explain()

In [18]:
tw_filter_df = twitter_df.filter("country = 'USA' and location like '%New%'")

In [19]:
tw_filter_df.explain()

In [20]:
tw_filter_df.show(5)

In [21]:
tw_filter_df = twitter_df.filter(col("location").startswith("N"))

In [22]:
tw_filter_df.explain()

In [23]:
tw_filter_df.show()

In [24]:
twitter_df

In [25]:
twitter_df.first()

In [26]:
twitter_df.take(5)

In [27]:
x = spark.sparkContext.parallelize([1,4,8])
y = x.flatMap(lambda x: (x, x*x))
print(x.collect())
print(y.collect())

In [28]:
x = spark.sparkContext.parallelize([1,4,8])
y = x.map(lambda x: (x, x*x))
print(x.collect())
print(y.collect())

In [29]:
x

In [30]:
twitter_df.select("text").show()

In [31]:
twitter_df.select("text","user").show()

In [32]:
twitter_df.rdd.map(lambda line: line.text.split(" ")).take(5)

In [33]:
twitter_df.rdd.flatMap(lambda line: line.text.split(" ")).take(100)

In [34]:
corona_df.show()

In [35]:
corona_df.filter("Country = 'US'").show()

In [36]:
corona_df.filter("Country = 'US'").sort(col("Date"),ascending = False).show()

In [37]:
corona_df.filter("Country = 'US'").orderBy(col("Date"),ascending = False).show()

In [38]:
corona_df.filter("Country = 'US'").sortWithinPartitions([col("Date"),col("Confirmed")],ascending = False).show()

In [39]:
corona_df.describe().show()

In [40]:
corona_df.printSchema()

In [41]:
corona_df.filter('Confirmed > 10000').sort(col('Confirmed')).show()

In [42]:
corona_df.filter('Confirmed > 10000').approxQuantile('Confirmed',[0.25,0.5,0.75,0.9,0.95],0.9)

In [43]:
corona_df.agg({"Date" : "max"}).collect()

In [44]:
corona_df.agg({"Date" : "max", "confirmed":"max"}).collect()

In [45]:
max_date = corona_df.agg({"Date" : "max"})

In [46]:
max_date.show()

In [47]:
import pyspark.sql.functions as F

In [48]:
corona_df.groupBy("Country","State_Cleaned").agg(F.max("Date")).show()

In [49]:
corona_df.join(corona_df.groupBy("Country","State_Cleaned").agg(F.max("Date").alias("Date")),on = ['Country', 'State_cleaned', 'Date'], how = 'inner').show()

In [50]:
corona_df.join(corona_df.groupBy("Country","State_Cleaned").agg(F.max("Date").alias("Date")),on = ['Country', 'State_cleaned', 'Date'], how = 'inner').sort("Confirmed", ascending = False).show(10)

In [51]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

ws = Window().partitionBy("Country", "State_Cleaned").orderBy(col("Date").desc())

corona_df.withColumn("row_num", row_number().over(ws)).orderBy(col("Country")).show()

In [52]:
corona_df.withColumn("row_num", row_number().over(ws)).where(col("row_num")==1).show()

In [53]:
corona_max_df = corona_df.join(corona_df.groupBy("Country","State_Cleaned").agg(F.max("Date").alias("Date")),on = ['Country', 'State_cleaned', 'Date'], how = 'inner').sort("Confirmed", ascending = False)

In [54]:
corona_max_df.show()

In [55]:
corona_df.groupBy("Country").pivot("Date").agg(F.sum("Confirmed")).show()

In [56]:
corona_df.filter("Country == 'US'").crosstab("State","Date").show()

In [57]:
corona_max_df.groupBy("Country").agg({"Confirmed":"sum"}).show()

In [58]:
corona_max_df.groupBy("Country").agg({"Confirmed":"sum", "Recovered":"sum", "Death":"sum"}).orderBy("sum(Confirmed)",ascending = False).show()

In [59]:
corona_df.filter("Country == 'Italy'").sort("Date",ascending = False).show()

In [60]:
corona_max_df.withColumn("Active",corona_max_df.Confirmed - corona_max_df.Recovered - corona_max_df.Death).sort("Active",ascending = False).show()

In [61]:
 corona_max_df = corona_max_df.withColumn("Active",corona_max_df.Confirmed - corona_max_df.Recovered - corona_max_df.Death)

In [62]:
corona_max_df.show(10)

In [63]:
corona_max_df.groupBy("Country").sum("Active").orderBy("sum(Active)", ascending = False).show()

In [64]:
corona_max_df.select("Country","State_Cleaned","Confirmed","Recovered").filter(col("Country").isin("Australia","China")).groupBy('Country').sum().sort(asc("Country")).show()

In [65]:
corona_max_df.select("Country","State_Cleaned","Confirmed","Recovered").filter(col("Country").isin("Australia","China")).cube('Country').sum().sort(asc("Country")).show()

In [66]:
corona_max_df.select("Country","State_Cleaned","Confirmed","Recovered").filter(col("Country").isin("Australia","Canada")).cube('Country',"State_cleaned").sum().sort(asc("Country")).show(100)

In [67]:
corona_max_df.select("Country","State_Cleaned","Confirmed","Recovered").filter(col("Country").isin("Australia","Canada")).rollup('Country',"State_Cleaned").sum().sort(asc("Country")).show()

In [68]:
corona_max_df.corr('Confirmed','Recovered')

In [69]:
corona_max_df.cache()

In [70]:
%%timeit  
corona_max_df.count()

In [71]:
from pyspark import StorageLevel
corona_max_df.persist(StorageLevel.MEMORY_AND_DISK)

In [72]:
%timeit corona_max_df.count()

In [73]:
pd = corona_df.toPandas()

In [74]:
pd.corr()

Unnamed: 0,_c0,Lat,Long,Confirmed,Death,Recovered
_c0,1.0,-0.001594,-0.005303,0.046789,0.043155,0.042857
Lat,-0.001594,1.0,-0.383389,0.008531,0.007041,0.000778
Long,-0.005303,-0.383389,1.0,0.098893,0.074918,0.079047
Confirmed,0.046789,0.008531,0.098893,1.0,0.963376,0.853924
Death,0.043155,0.007041,0.074918,0.963376,1.0,0.857292
Recovered,0.042857,0.000778,0.079047,0.853924,0.857292,1.0


In [75]:
corona_max_df.createOrReplaceTempView("corona_max")

In [76]:
spark.sql("select * from corona_max").show()

In [77]:
%sql
select * from corona_max where Country in ('Australia', 'Canada') order by Country

Country,state_cleaned,Date,_c0,State,Lat,Long,Confirmed,Death,Recovered,City,Active
Australia,New South Wales,2020-03-20T00:00:00.000+0000,27672,New South Wales,-33.8688,151.2093,353,6,4,,343
Australia,Queensland,2020-03-20T00:00:00.000+0000,27674,Queensland,-28.0167,153.4,184,0,8,,176
Australia,Tasmania,2020-03-20T00:00:00.000+0000,27727,Tasmania,-41.4545,145.9707,10,0,3,,7
Australia,Victoria,2020-03-20T00:00:00.000+0000,27673,Victoria,-37.8136,144.9631,121,0,8,,113
Australia,Australian Capital Territory,2020-03-20T00:00:00.000+0000,28068,Australian Capital Territory,-35.4735,149.0124,6,0,0,,6
Australia,South Australia,2020-03-20T00:00:00.000+0000,27685,South Australia,-34.9285,138.6007,50,0,3,,47
Australia,Western Australia,2020-03-20T00:00:00.000+0000,27715,Western Australia,-31.9505,115.8605,64,1,0,,63
Australia,From Diamond Princess,2020-03-20T00:00:00.000+0000,27688,From Diamond Princess,35.4437,139.638,0,0,0,,0
Australia,Northern Territory,2020-03-20T00:00:00.000+0000,27737,Northern Territory,-12.4634,130.8456,3,0,0,,3
Canada,Ontario,2020-03-20T00:00:00.000+0000,27758,Ontario,51.2538,-85.3232,308,2,5,,301


In [78]:
%sql
select Country, State_Cleaned, SUM(Confirmed), SUM(Recovered) from corona_max where Country in ('Australia', 'Canada') GROUP BY ROLLUP (Country, State_Cleaned) order by Country

Country,State_Cleaned,sum(Confirmed),sum(Recovered)
,,1734,35
Australia,New South Wales,353,4
Australia,Australian Capital Territory,6,0
Australia,Western Australia,64,0
Australia,Northern Territory,3,0
Australia,Tasmania,10,3
Australia,,791,26
Australia,From Diamond Princess,0,0
Australia,Victoria,121,8
Australia,South Australia,50,3


In [79]:
%sql
select Country, State_Cleaned, SUM(Confirmed), SUM(Recovered) from corona_max where Country in ('Australia', 'Canada') GROUP BY CUBE (Country, State_Cleaned) order by Country

Country,State_Cleaned,sum(Confirmed),sum(Recovered)
,Northern Territory,3,0
,Queensland,184,8
,Grand Princess,10,0
,,1734,35
,Tasmania,10,3
,Saskatchewan,20,0
,From Diamond Princess,0,0
,British Columbia,271,4
,Nova Scotia,15,0
,Ontario,308,5
