# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import the pyspark and check it's version

In [1]:
spark

24/08/28 13:41:30 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### Import and create SparkSession

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### Load the PatientInfo.csv file and show the first 5 rows

In [3]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [4]:
df_p = spark.read.format('csv')\
.option('inferScheam', 'true')\
.option('header', 'true')\
.load('PatientInfo.csv')

In [5]:
df_p.cache()

DataFrame[patient_id: string, sex: string, age: string, country: string, province: string, city: string, infection_case: string, infected_by: string, contact_number: string, symptom_onset_date: string, confirmed_date: string, released_date: string, deceased_date: string, state: string]

In [6]:
df_p.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|

### Display the schema of the dataset

In [7]:
df_p.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: string (nullable = true)
 |-- released_date: string (nullable = true)
 |-- deceased_date: string (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [8]:
df_summary = df_p.summary()
df_summary.show()

24/08/28 13:41:55 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 3:>                                                          (0 + 1) / 1]

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------------+-------------+-------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+------------------+--------------+-------------+-------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|               690|          5162|         1587|           66|    5165|
|   mean|2.8636345618679576E9|  NULL|NULL|      NULL|    NULL|          NULL|                NULL|2.2845944015643125E9|1.6772572523506988E7|            

                                                                                

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [9]:
state = df_p.groupBy("state").count()
state.show()

+--------+-----+
|   state|count|
+--------+-----+
|isolated| 2158|
|released| 2929|
|deceased|   78|
+--------+-----+



### Display the number of null values in each column

In [10]:
from pyspark.sql.functions import *
import pyspark.sql.functions as F

In [11]:
null_dict = {}
for i in df_p.columns:
    null_c = df_p.filter(col(i).isNull()).count()
    null_dict [i] = null_c

for col, num in null_dict.items():
    print(col, num)


patient_id 0
sex 1122
age 1380
country 0
province 0
city 94
infection_case 919
infected_by 3819
contact_number 4374
symptom_onset_date 4475
confirmed_date 3
released_date 3578
deceased_date 5099
state 0


## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [12]:
from pyspark.sql.functions import coalesce, col
dec_col = df_p.withColumn('deceased_date', coalesce(col('deceased_date'), col('released_date')))

dec_col.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|

### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [13]:
from pyspark.sql.types import DateType

df_p_n = dec_col.withColumn('confirmed_date', col('confirmed_date').cast(DateType()))\
                 .withColumn('deceased_date', col('deceased_date').cast(DateType()))\
                 .withColumn('no_days', datediff(col('deceased_date'), col('confirmed_date')))
df_p_n.show(5)

#withColumn('released_date', col('released_date').cast(DateType()))\
#df = df.withColumn('deceased_date', coalesce(col('deceased_date').cast(DateType()), col('released_date')))

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|

In [14]:
df_p_n.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: string (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: integer (nullable = true)



### Add a is_male column if male then it should yield true, else then False

In [15]:
df_n = df_p_n.withColumn('is_male', 
                         when(col('sex') == 'male', True)
                         .when(col('sex') == 'female', False))
#df_new = df_n.fillna({'is_male': False})

df_n.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|co

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [20]:
from pyspark.sql.types import StringType

def udf_dead(s):
    if s != 'released':
        return 'True'  
    else: 
        return 'False'

s_udf = udf(udf_dead, StringType()) # this solve the error of matching the function return value
df_pat = df_new.withColumn('is_dead', s_udf(col('state')))
df_pat.show()
#df_pat = df_new.withColumn('is_dead', is_dead(col('state')))

[Stage 55:>                                                         (0 + 1) / 1]

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|  False|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|  False|
|1000000003|  m

                                                                                

In [19]:
df1 = df_n.withColumn('is_dead',when(col('state') != 'released', True))
df2 = df1.fillna({'is_dead': False})
df2.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|  false|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|  false|
|1000000003|  m

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [20]:
age = df2.groupBy("age").count()
age.show()

+----+-----+
| age|count|
+----+-----+
|  0s|   66|
| 10s|  178|
|NULL| 1380|
| 40s|  518|
| 80s|  170|
| 70s|  232|
| 90s|   49|
| 20s|  899|
| 50s|  667|
| 60s|  482|
|100s|    1|
| 30s|  523|
+----+-----+



In [66]:
'''
df3 = df2.withColumn('age',
     when(col('age') == '0s', 0)
    .when(col('age') == '10s', 10)
    .when(col('age') == '20s', 20)
    .when(col('age') == '30s', 30)
    .when(col('age') == '40s', 40)
    .when(col('age') == '50s', 50)
    .when(col('age') == '60s', 60)
    .when(col('age') == '70s', 70)
    .when(col('age') == '80s', 80)
    .when(col('age') == '90s', 90)
    .when(col('age') == '100s', 100)
    .otherwise(col('age'))  
)
df3.show()
'''
df3 = df2.withColumn('age', translate(col('age'), "s", ""))
df3.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|  false|
|1000000002|  male| 30|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|  false|
|1000000003|  m

In [22]:
def proc_age(age):
    if age.endswith('s'):
        return age[:-1]  
    return age 

age_udf = udf(proc_age, StringType())
df_patnew = df_pat.withColumn('age', age_udf(col('age')))

df_patnew.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|   true|  False|
|1000000002|  male| 30|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|   true|  False|
|1000000003|  m

### Change age, and no_days  to be typecasted as Double

In [36]:
from pyspark.sql.types import DoubleType
df4 = df3.withColumn('age', col('age').cast(DoubleType()))\
           .withColumn('no_days', col('no_days').cast(DoubleType()))
df4.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: string (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: double (nullable = true)
 |-- is_male: boolean (nullable = false)
 |-- is_dead: boolean (nullable = false)



### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [37]:
df5 = df4.drop('patient_id')\
       .drop('sex')\
       .drop('infected_by')\
       .drop('contact_number')\
       .drop('released_date')\
       .drop('state')\
       .drop('symptom_onset_date')\
       .drop('confirmed_date')\
       .drop('deceased_date')\
       .drop('country')\
       .drop('no_days')\
       .drop('city')\
       .drop('infection_case')

In [38]:
df5.printSchema()

root
 |-- age: double (nullable = true)
 |-- province: string (nullable = true)
 |-- is_male: boolean (nullable = false)
 |-- is_dead: boolean (nullable = false)



In [39]:
df5.show(4)

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|50.0|   Seoul|   true|  false|
|30.0|   Seoul|   true|  false|
|50.0|   Seoul|   true|  false|
|20.0|   Seoul|   true|  false|
+----+--------+-------+-------+
only showing top 4 rows



### Recount the number of nulls now

In [43]:
df5.show()

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|50.0|   Seoul|   true|  false|
|30.0|   Seoul|   true|  false|
|50.0|   Seoul|   true|  false|
|20.0|   Seoul|   true|  false|
|20.0|   Seoul|  false|  false|
|50.0|   Seoul|  false|  false|
|20.0|   Seoul|   true|  false|
|20.0|   Seoul|   true|  false|
|30.0|   Seoul|   true|  false|
|60.0|   Seoul|  false|  false|
|50.0|   Seoul|  false|  false|
|20.0|   Seoul|   true|  false|
|80.0|   Seoul|   true|   true|
|60.0|   Seoul|  false|  false|
|70.0|   Seoul|   true|  false|
|70.0|   Seoul|   true|  false|
|70.0|   Seoul|   true|  false|
|20.0|   Seoul|   true|  false|
|70.0|   Seoul|  false|  false|
|70.0|   Seoul|  false|  false|
+----+--------+-------+-------+
only showing top 20 rows



In [44]:
df5.columns

['age', 'province', 'is_male', 'is_dead']

In [45]:
dict = {}
for x in df5.columns:
    nulls = df5.filter(col(x).isNull()).count()
    dict [x] = nulls

for c, n in dict.items():
    print(c, n)


age 1380
province 0
is_male 0
is_dead 0


## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [46]:
df_p.createOrReplaceTempView("patient_view")

### Use SELECT statement to select all columns from the dataframe and show the output.

In [47]:
all_cols = spark.sql(""" select * from patient_view """)
all_cols.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|

### *Using SQL commands*, limit the output to only 5 rows 

In [48]:
#lim_5rows = spark.sql(""" select top(5)* from patient_view  """)
lim_5rows = spark.sql(""" select * from patient_view limit 5  """)

lim_5rows.show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   202

### Select the count of males and females in the dataset

In [49]:
c_m_f = spark.sql(""" select sex, count(sex) count
                      from patient_view
                      group by sex
""")
c_m_f.show()

+------+-----+
|   sex|count|
+------+-----+
|  NULL|    0|
|female| 2218|
|  male| 1825|
+------+-----+



### How many people did survive, and how many didn't?

In [50]:
surv = spark.sql(""" select state, count(state) count
                      from patient_view
                      group by state
""")
surv.show()

+--------+-----+
|   state|count|
+--------+-----+
|isolated| 2158|
|released| 2929|
|deceased|   78|
+--------+-----+



In [51]:
surv2 = spark.sql(""" 
select
    case
        when state = 'released' then 'survive'
        else 'not_survive'
    end as status,
    count(state) number
from patient_view
group by
    case
        when state = 'released' then 'survive'
        else 'not_survive'
    end
""")

surv2.show()

+-----------+------+
|     status|number|
+-----------+------+
|not_survive|  2236|
|    survive|  2929|
+-----------+------+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [7]:
query = spark.sql("""
select
    sex,
    cast(substring(age, 1, length(age) - 1) AS DOUBLE) AS age,
    province,
    state
from
    patient_view
""")


In [8]:
query.show()

+------+----+--------+--------+
|   sex| age|province|   state|
+------+----+--------+--------+
|  male|50.0|   Seoul|released|
|  male|30.0|   Seoul|released|
|  male|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|female|20.0|   Seoul|released|
|female|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|  male|30.0|   Seoul|released|
|female|60.0|   Seoul|released|
|female|50.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|  male|80.0|   Seoul|deceased|
|female|60.0|   Seoul|released|
|  male|70.0|   Seoul|released|
|  male|70.0|   Seoul|released|
|  male|70.0|   Seoul|released|
|  male|20.0|   Seoul|released|
|female|70.0|   Seoul|released|
|female|70.0|   Seoul|released|
+------+----+--------+--------+
only showing top 20 rows



In [None]:


result_df = spark.sql(query)