## PySpark
* A PySpark library to apply SQL-like analysis on a huge amount of structured or semi-structured data. We can also use SQL queries with PySparkSQL. It can also be connected to Apache Hive. HiveQL can be also be applied. PySparkSQL is a wrapper over the PySpark core. PySparkSQL introduced the DataFrame, a tabular representation of structured data that is similar to that of a table from a relational database management system.

In [100]:
!pip install pyspark



In [42]:
import pyspark

In [43]:
import pandas as pd
df = pd.read_csv("C:\\Users\\saura\\Downloads\\covid_toy.csv")

In [44]:
df.head()

Unnamed: 0,age,gender,fever,cough,city,has_covid
0,60.0,Male,103.0,Mild,Kolkata,No
1,27.0,Male,100.0,Mild,Delhi,Yes
2,42.0,Male,101.0,Mild,Delhi,No
3,31.0,Female,98.0,Mild,Kolkata,
4,65.0,Female,101.0,Mild,Mumbai,No


In [45]:
from pyspark.sql import SparkSession

In [46]:
spark = SparkSession.builder.appName("OCD1").getOrCreate()

In [47]:
spark

In [48]:
df1 = spark.read.csv("C:\\Users\\saura\\Downloads\\covid_toy.csv")

In [49]:
df1

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string]

In [50]:
df1.show(5)

+---+------+-----+-----+-------+---------+
|_c0|   _c1|  _c2|  _c3|    _c4|      _c5|
+---+------+-----+-----+-------+---------+
|age|gender|fever|cough|   city|has_covid|
| 60|  Male|  103| Mild|Kolkata|       No|
| 27|  Male|  100| Mild|  Delhi|      Yes|
| 42|  Male|  101| Mild|  Delhi|       No|
| 31|Female|   98| Mild|Kolkata|     NULL|
+---+------+-----+-----+-------+---------+
only showing top 5 rows



In [51]:
df1 = spark.read.option("header", True).csv("C:\\Users\\saura\\Downloads\\covid_toy.csv")

In [52]:
df1.show()

+----+------+-----+------+---------+---------+
| age|gender|fever| cough|     city|has_covid|
+----+------+-----+------+---------+---------+
|  60|  Male|  103|  Mild|  Kolkata|       No|
|  27|  Male|  100|  Mild|    Delhi|      Yes|
|  42|  Male|  101|  Mild|    Delhi|       No|
|  31|Female|   98|  Mild|  Kolkata|     NULL|
|  65|Female|  101|  Mild|   Mumbai|       No|
|NULL|Female| NULL|  Mild|Bangalore|      Yes|
|  14|  NULL|  101|Strong|Bangalore|       No|
|  20|Female| NULL|Strong|   Mumbai|      Yes|
|  19|Female|  100|Strong|Bangalore|     NULL|
|  64|Female|  101|  Mild|    Delhi|       No|
|  75|Female| NULL|  Mild|    Delhi|       No|
|  65|Female|   98|  Mild|   Mumbai|      Yes|
|  25|Female|   99|Strong|  Kolkata|       No|
|  64|  NULL|  102|  Mild|Bangalore|      Yes|
|  51|  Male|  104|  Mild|Bangalore|       No|
|  70|  Male|  103|Strong|  Kolkata|      Yes|
|  69|Female|  103|  Mild|  Kolkata|      Yes|
|  40|Female|   98|Strong|    Delhi|       No|
|  64|Female|

In [53]:
df1.printSchema()

root
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- fever: string (nullable = true)
 |-- cough: string (nullable = true)
 |-- city: string (nullable = true)
 |-- has_covid: string (nullable = true)



In [54]:
type(df1)

pyspark.sql.dataframe.DataFrame

In [55]:
df1.columns

['age', 'gender', 'fever', 'cough', 'city', 'has_covid']

In [56]:
df.head()

Unnamed: 0,age,gender,fever,cough,city,has_covid
0,60.0,Male,103.0,Mild,Kolkata,No
1,27.0,Male,100.0,Mild,Delhi,Yes
2,42.0,Male,101.0,Mild,Delhi,No
3,31.0,Female,98.0,Mild,Kolkata,
4,65.0,Female,101.0,Mild,Mumbai,No


In [57]:
df1.select('city','has_covid').show(5)

+-------+---------+
|   city|has_covid|
+-------+---------+
|Kolkata|       No|
|  Delhi|      Yes|
|  Delhi|       No|
|Kolkata|     NULL|
| Mumbai|       No|
+-------+---------+
only showing top 5 rows



In [58]:
df1.dtypes

[('age', 'string'),
 ('gender', 'string'),
 ('fever', 'string'),
 ('cough', 'string'),
 ('city', 'string'),
 ('has_covid', 'string')]

In [59]:
df1.describe().show()

+-------+------------------+------+------------------+------+---------+---------+
|summary|               age|gender|             fever| cough|     city|has_covid|
+-------+------------------+------+------------------+------+---------+---------+
|  count|                99|    98|                90|   100|       98|       97|
|   mean| 43.81818181818182|  NULL|100.84444444444445|  NULL|     NULL|     NULL|
| stddev|24.677248728813904|  NULL| 2.054926178236703|  NULL|     NULL|     NULL|
|    min|                10|Female|               100|  Mild|Bangalore|       No|
|    max|                83|  Male|                99|Strong|   Mumbai|      Yes|
+-------+------------------+------+------------------+------+---------+---------+



In [60]:
df1 = df1.withColumn("fever",
                    df1["fever"].cast('float'))
df1 = df1.withColumn("age",
                    df1["age"].cast('int'))

In [61]:
df1.dtypes

[('age', 'int'),
 ('gender', 'string'),
 ('fever', 'float'),
 ('cough', 'string'),
 ('city', 'string'),
 ('has_covid', 'string')]

In [62]:
df1.withColumn("rise_fever",df1["fever"]+5).show(5)

+---+------+-----+-----+-------+---------+----------+
|age|gender|fever|cough|   city|has_covid|rise_fever|
+---+------+-----+-----+-------+---------+----------+
| 60|  Male|103.0| Mild|Kolkata|       No|     108.0|
| 27|  Male|100.0| Mild|  Delhi|      Yes|     105.0|
| 42|  Male|101.0| Mild|  Delhi|       No|     106.0|
| 31|Female| 98.0| Mild|Kolkata|     NULL|     103.0|
| 65|Female|101.0| Mild| Mumbai|       No|     106.0|
+---+------+-----+-----+-------+---------+----------+
only showing top 5 rows



In [63]:
df1.drop('fever').show(5)

+---+------+-----+-------+---------+
|age|gender|cough|   city|has_covid|
+---+------+-----+-------+---------+
| 60|  Male| Mild|Kolkata|       No|
| 27|  Male| Mild|  Delhi|      Yes|
| 42|  Male| Mild|  Delhi|       No|
| 31|Female| Mild|Kolkata|     NULL|
| 65|Female| Mild| Mumbai|       No|
+---+------+-----+-------+---------+
only showing top 5 rows



In [64]:
df1.withColumnRenamed('city','region').show(2)

+---+------+-----+-----+-------+---------+
|age|gender|fever|cough| region|has_covid|
+---+------+-----+-----+-------+---------+
| 60|  Male|103.0| Mild|Kolkata|       No|
| 27|  Male|100.0| Mild|  Delhi|      Yes|
+---+------+-----+-----+-------+---------+
only showing top 2 rows



In [65]:
df1.where(df1.fever.isNull()).count()

10

In [66]:
df1.na.drop().show(5)

+---+------+-----+-----+-------+---------+
|age|gender|fever|cough|   city|has_covid|
+---+------+-----+-----+-------+---------+
| 60|  Male|103.0| Mild|Kolkata|       No|
| 27|  Male|100.0| Mild|  Delhi|      Yes|
| 42|  Male|101.0| Mild|  Delhi|       No|
| 65|Female|101.0| Mild| Mumbai|       No|
| 64|Female|101.0| Mild|  Delhi|       No|
+---+------+-----+-----+-------+---------+
only showing top 5 rows



In [72]:
df1.na.drop(how="any").show(8)

+---+------+-----+------+---------+---------+
|age|gender|fever| cough|     city|has_covid|
+---+------+-----+------+---------+---------+
| 60|  Male|103.0|  Mild|  Kolkata|       No|
| 27|  Male|100.0|  Mild|    Delhi|      Yes|
| 42|  Male|101.0|  Mild|    Delhi|       No|
| 65|Female|101.0|  Mild|   Mumbai|       No|
| 64|Female|101.0|  Mild|    Delhi|       No|
| 65|Female| 98.0|  Mild|   Mumbai|      Yes|
| 25|Female| 99.0|Strong|  Kolkata|       No|
| 51|  Male|104.0|  Mild|Bangalore|       No|
+---+------+-----+------+---------+---------+
only showing top 8 rows



In [68]:
df1.na.drop(how="any",subset=["fever"]).show(5)

+---+------+-----+-----+-------+---------+
|age|gender|fever|cough|   city|has_covid|
+---+------+-----+-----+-------+---------+
| 60|  Male|103.0| Mild|Kolkata|       No|
| 27|  Male|100.0| Mild|  Delhi|      Yes|
| 42|  Male|101.0| Mild|  Delhi|       No|
| 31|Female| 98.0| Mild|Kolkata|     NULL|
| 65|Female|101.0| Mild| Mumbai|       No|
+---+------+-----+-----+-------+---------+
only showing top 5 rows



In [74]:
df1.na.fill("missing").show(5)

+---+------+-----+-----+-------+---------+
|age|gender|fever|cough|   city|has_covid|
+---+------+-----+-----+-------+---------+
| 60|  Male|103.0| Mild|Kolkata|       No|
| 27|  Male|100.0| Mild|  Delhi|      Yes|
| 42|  Male|101.0| Mild|  Delhi|       No|
| 31|Female| 98.0| Mild|Kolkata|  missing|
| 65|Female|101.0| Mild| Mumbai|       No|
+---+------+-----+-----+-------+---------+
only showing top 5 rows



In [79]:
df1.na.fill(0,['age','fever']).show(7)

+---+------+-----+------+---------+---------+
|age|gender|fever| cough|     city|has_covid|
+---+------+-----+------+---------+---------+
| 60|  Male|103.0|  Mild|  Kolkata|       No|
| 27|  Male|100.0|  Mild|    Delhi|      Yes|
| 42|  Male|101.0|  Mild|    Delhi|       No|
| 31|Female| 98.0|  Mild|  Kolkata|     NULL|
| 65|Female|101.0|  Mild|   Mumbai|       No|
|  0|Female|  0.0|  Mild|Bangalore|      Yes|
| 14|  NULL|101.0|Strong|Bangalore|       No|
+---+------+-----+------+---------+---------+
only showing top 7 rows



In [80]:
from pyspark.ml.feature import Imputer

In [85]:
imputer = Imputer(inputCols=["age","fever"],
                 outputCols=["{}_imputed".format(c) for c in ["age","fever"]]).setStrategy("median")

In [86]:
imputer.fit(df1).transform(df1).show()

+----+------+-----+------+---------+---------+-----------+-------------+
| age|gender|fever| cough|     city|has_covid|age_imputed|fever_imputed|
+----+------+-----+------+---------+---------+-----------+-------------+
|  60|  Male|103.0|  Mild|  Kolkata|       No|         60|        103.0|
|  27|  Male|100.0|  Mild|    Delhi|      Yes|         27|        100.0|
|  42|  Male|101.0|  Mild|    Delhi|       No|         42|        101.0|
|  31|Female| 98.0|  Mild|  Kolkata|     NULL|         31|         98.0|
|  65|Female|101.0|  Mild|   Mumbai|       No|         65|        101.0|
|NULL|Female| NULL|  Mild|Bangalore|      Yes|         44|        101.0|
|  14|  NULL|101.0|Strong|Bangalore|       No|         14|        101.0|
|  20|Female| NULL|Strong|   Mumbai|      Yes|         20|        101.0|
|  19|Female|100.0|Strong|Bangalore|     NULL|         19|        100.0|
|  64|Female|101.0|  Mild|    Delhi|       No|         64|        101.0|
|  75|Female| NULL|  Mild|    Delhi|       No|     

In [95]:
df1.filter("age >= 70").show(5)

+---+------+-----+------+---------+---------+
|age|gender|fever| cough|     city|has_covid|
+---+------+-----+------+---------+---------+
| 75|Female| NULL|  Mild|    Delhi|       No|
| 70|  Male|103.0|Strong|  Kolkata|      Yes|
| 73|  Male| 98.0|  Mild|Bangalore|     NULL|
| 71|Female| 98.0|Strong|  Kolkata|      Yes|
| 80|Female| 98.0|  Mild|    Delhi|      Yes|
+---+------+-----+------+---------+---------+
only showing top 5 rows



In [96]:
df1.filter("age >= 70").select(["age","has_covid"]).show(5)

+---+---------+
|age|has_covid|
+---+---------+
| 75|       No|
| 70|      Yes|
| 73|     NULL|
| 71|      Yes|
| 80|      Yes|
+---+---------+
only showing top 5 rows



In [99]:
df1.filter((df1["fever"] <= 105) & 
           (df1["fever"] >= 100)).show(5)

+---+------+-----+------+---------+---------+
|age|gender|fever| cough|     city|has_covid|
+---+------+-----+------+---------+---------+
| 60|  Male|103.0|  Mild|  Kolkata|       No|
| 27|  Male|100.0|  Mild|    Delhi|      Yes|
| 42|  Male|101.0|  Mild|    Delhi|       No|
| 65|Female|101.0|  Mild|   Mumbai|       No|
| 14|  NULL|101.0|Strong|Bangalore|       No|
+---+------+-----+------+---------+---------+
only showing top 5 rows

