In [1]:

import findspark 
findspark.init()
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("COVID-19 Data Analysis").getOrCreate()

if 'spark' in locals() and isinstance(spark, SparkSession):
    print("SparkSession is active and ready to use.")
else:
    print("SparkSession is not active. Please create a SparkSession.")

SparkSession is active and ready to use.


In [2]:
import pandas as pd
vac = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv')
vac.sample(7)

Unnamed: 0,iso_code,continent,location,last_updated_date,total_cases,new_cases,new_cases_smoothed,total_deaths,new_deaths,new_deaths_smoothed,...,male_smokers,handwashing_facilities,hospital_beds_per_thousand,life_expectancy,human_development_index,population,excess_mortality_cumulative_absolute,excess_mortality_cumulative,excess_mortality,excess_mortality_cumulative_per_million
240,VEN,South America,Venezuela,2024-08-04,552695.0,0.0,0.0,5856.0,0.0,0.0,...,,,0.8,72.06,0.711,28301700.0,,,,
34,BFA,Africa,Burkina Faso,2024-08-04,22139.0,0.0,0.0,400.0,0.0,0.0,...,23.9,11.877,0.4,61.58,0.452,22673764.0,,,,
171,PAN,North America,Panama,2024-08-04,1044821.0,0.0,0.0,8748.0,0.0,0.0,...,9.9,,2.3,78.51,0.815,4408582.0,,,,
59,DMA,North America,Dominica,2024-08-04,16047.0,0.0,0.0,74.0,0.0,0.0,...,,,3.8,75.0,0.742,72758.0,,,,
11,ABW,North America,Aruba,2024-08-04,44224.0,0.0,0.0,292.0,0.0,0.0,...,,,,76.29,,106459.0,,,,
124,LBY,Africa,Libya,2024-08-04,507269.0,0.0,0.0,6437.0,0.0,0.0,...,,,3.7,72.91,0.724,6812344.0,,,,
236,URY,South America,Uruguay,2024-08-04,1041346.0,15.0,2.143,7682.0,0.0,0.0,...,19.9,,2.8,77.91,0.817,3422796.0,,,,


In [3]:
print(vac.shape)
print(vac[['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']].dtypes)
vac.loc[44:54,['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']]

(247, 67)
continent              object
total_cases           float64
total_deaths          float64
total_vaccinations    float64
population            float64
dtype: object


Unnamed: 0,continent,total_cases,total_deaths,total_vaccinations,population
44,Asia,99373219.0,122304.0,,1425887000.0
45,South America,6391876.0,142727.0,,51874030.0
46,Africa,9109.0,160.0,,836783.0
47,Africa,25227.0,389.0,,5970430.0
48,Oceania,7345.0,2.0,,17032.0
49,North America,1234701.0,9372.0,,5180836.0
50,Africa,88434.0,835.0,,28160550.0
51,Europe,1317144.0,18752.0,,4030361.0
52,North America,1113662.0,8530.0,,11212200.0
53,North America,45883.0,305.0,,191173.0


In [4]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType

In [5]:
schema = StructType([
    StructField("location", StringType(), True),
    StructField("continent", StringType(), True),
    StructField("total_cases", LongType(), True),
    StructField("total_deaths", LongType(), True),
    StructField("total_vaccinations", LongType(), True),
    StructField("population", LongType(), True)
])

vac['location'] = vac['location'].astype(str)
vac['continent'] = vac['continent'].astype(str)
vac['total_cases'] = vac['total_cases'].fillna(0).astype('int64') 
vac['total_deaths'] = vac['total_deaths'].fillna(0).astype('int64')
vac['total_vaccinations'] = vac['total_vaccinations'].fillna(0).astype('int64')  
vac['population'] = vac['population'].fillna(0).astype('int64')  

vsp = spark.createDataFrame(vac[schema.fieldNames()])  
print(type(vsp))
# vsp.show(40)

<class 'pyspark.sql.dataframe.DataFrame'>


In [6]:
vsp.printSchema()

root
 |-- location: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- total_cases: long (nullable = true)
 |-- total_deaths: long (nullable = true)
 |-- total_vaccinations: long (nullable = true)
 |-- population: long (nullable = true)



In [7]:
vsp.select('location','total_cases','population').show()

+-------------------+-----------+----------+
|           location|total_cases|population|
+-------------------+-----------+----------+
|        Afghanistan|     235214|  41128772|
|             Africa|   13145380|1426736614|
|            Albania|     335047|   2842318|
|            Algeria|     272139|  44903228|
|     American Samoa|       8359|     44295|
|            Andorra|      48015|     79843|
|             Angola|     107481|  35588996|
|           Anguilla|       3904|     15877|
|Antigua and Barbuda|       9106|     93772|
|          Argentina|   10101218|  45510324|
|            Armenia|     452273|   2780472|
|              Aruba|      44224|    106459|
|               Asia|  301499099|4721383370|
|          Australia|   11861161|  26177410|
|            Austria|    6082444|   8939617|
|         Azerbaijan|     835757|  10358078|
|            Bahamas|      39127|    409989|
|            Bahrain|     696614|   1472237|
|         Bangladesh|    2051348| 171186368|
|         

In [8]:
vsp.filter(vsp['population']>200000000).show()

+--------------------+-------------+-----------+------------+------------------+----------+
|            location|    continent|total_cases|total_deaths|total_vaccinations|population|
+--------------------+-------------+-----------+------------+------------------+----------+
|              Africa|          nan|   13145380|      259117|                 0|1426736614|
|                Asia|          nan|  301499099|     1637249|        9104304615|4721383370|
|              Brazil|South America|   37511921|      702116|                 0| 215313504|
|               China|         Asia|   99373219|      122304|                 0|1425887360|
|              Europe|          nan|  252916868|     2102483|        1399334208| 744807803|
| European Union (27)|          nan|  185822587|     1262988|         951113290| 450146793|
|High-income count...|          nan|  429044049|     2997359|        2840880020|1250514600|
|               India|         Asia|   45041748|      533623|        2206868000|

In [9]:
pvsp = vsp.withColumn('perc_deaths',vsp['total_deaths']/vsp['total_cases']*100)
pvsp.show()

+-------------------+-------------+-----------+------------+------------------+----------+-------------------+
|           location|    continent|total_cases|total_deaths|total_vaccinations|population|        perc_deaths|
+-------------------+-------------+-----------+------------+------------------+----------+-------------------+
|        Afghanistan|         Asia|     235214|        7998|                 0|  41128772|  3.400307804807537|
|             Africa|          nan|   13145380|      259117|                 0|1426736614| 1.9711640135165358|
|            Albania|       Europe|     335047|        3605|                 0|   2842318| 1.0759684462179933|
|            Algeria|       Africa|     272139|        6881|                 0|  44903228|   2.52848728039715|
|     American Samoa|      Oceania|       8359|          34|                 0|     44295|0.40674721856681423|
|            Andorra|       Europe|      48015|         159|                 0|     79843|  0.331146516713527|
|

In [10]:
pvsp.groupby('continent').agg({"total_deaths": "SUM"}).show()

+-------------+-----------------+
|    continent|sum(total_deaths)|
+-------------+-----------------+
|       Europe|          2102483|
|       Africa|           259117|
|          nan|         22430618|
|North America|          1671178|
|South America|          1354187|
|      Oceania|            32918|
|         Asia|          1637249|
+-------------+-----------------+



In [11]:
pvsp.createTempView('vakt')
spark.sql("SELECT * from vakt WHERE continent='Asia'").show()

+-----------+---------+-----------+------------+------------------+----------+--------------------+
|   location|continent|total_cases|total_deaths|total_vaccinations|population|         perc_deaths|
+-----------+---------+-----------+------------+------------------+----------+--------------------+
|Afghanistan|     Asia|     235214|        7998|                 0|  41128772|   3.400307804807537|
|    Armenia|     Asia|     452273|        8777|                 0|   2780472|  1.9406420458439926|
| Azerbaijan|     Asia|     835757|       10353|                 0|  10358078|  1.2387571985636974|
|    Bahrain|     Asia|     696614|        1536|                 0|   1472237| 0.22049513790994724|
| Bangladesh|     Asia|    2051348|       29499|                 0| 171186368|  1.4380300173349427|
|     Bhutan|     Asia|      62697|          21|                 0|    782457| 0.03349442557060146|
|     Brunei|     Asia|     347723|         179|                 0|    449002|0.051477756720147934|


In [14]:
pvsp.sort('population').show(300)

+--------------------+-------------+-----------+------------+------------------+----------+--------------------+
|            location|    continent|total_cases|total_deaths|total_vaccinations|population|         perc_deaths|
+--------------------+-------------+-----------+------------+------------------+----------+--------------------+
|            Pitcairn|      Oceania|          4|           0|                 0|        47|                 0.0|
|             Vatican|       Europe|         26|           0|                 0|       808|                 0.0|
|             Tokelau|      Oceania|         80|           0|                 0|      1893|                 0.0|
|                Niue|      Oceania|       1074|           0|                 0|      1952|                 0.0|
|    Falkland Islands|South America|       1923|           0|                 0|      3801|                 0.0|
|          Montserrat|North America|       1403|           8|                 0|      4413|  0.5

### Regulisanje Nan(py)/Null(sql) vrednosti

In [28]:
spark.sql('SELECT * from vakt where perc_deaths is NULL').show()

+------------+---------+-----------+------------+------------------+----------+-----------+
|    location|continent|total_cases|total_deaths|total_vaccinations|population|perc_deaths|
+------------+---------+-----------+------------+------------------+----------+-----------+
|   Hong Kong|     Asia|          0|           0|          21014839|   7488863|       NULL|
| North Korea|     Asia|          0|           0|                 0|  26069416|       NULL|
|Turkmenistan|     Asia|          0|           0|                 0|   6430777|       NULL|
+------------+---------+-----------+------------+------------------+----------+-----------+



In [35]:
pvsp.fillna({'perc_deaths':0.0})
pvsp.createTempView('vax')
spark.sql('SELECT * from vax where perc_deaths is NULL').show()

+--------+---------+-----------+------------+------------------+----------+-----------+
|location|continent|total_cases|total_deaths|total_vaccinations|population|perc_deaths|
+--------+---------+-----------+------------+------------------+----------+-----------+
+--------+---------+-----------+------------+------------------+----------+-----------+



### malo interesantniji sql upiti

In [37]:
spark.sql("SELECT * from vax where location like 'North%'").show()

+--------------------+---------+-----------+------------+------------------+----------+------------------+
|            location|continent|total_cases|total_deaths|total_vaccinations|population|       perc_deaths|
+--------------------+---------+-----------+------------+------------------+----------+------------------+
|       North America|      nan|  124492666|     1671178|        1158547416| 600323657|  1.34239072364311|
|         North Korea|     Asia|          0|           0|                 0|  26069416|               0.0|
|     North Macedonia|   Europe|     350924|        9978|                 0|   2093606| 2.843350697017018|
|Northern Mariana ...|  Oceania|      14912|          41|                 0|     49574|0.2749463519313305|
+--------------------+---------+-----------+------------+------------------+----------+------------------+



In [39]:
spark.sql("SELECT continent,count(*) from vax group by continent ORDER BY 2 DESC").show()

+-------------+--------+
|    continent|count(1)|
+-------------+--------+
|       Africa|      57|
|       Europe|      51|
|         Asia|      48|
|North America|      41|
|      Oceania|      24|
|South America|      14|
|          nan|      12|
+-------------+--------+

