# Python - pyspark for dtap

## COVID19 HK dataset

In [3]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().set("spark.jars", "/opt/bdfs/bluedata-dtap.jar")
sc = SparkContext( conf=conf)
sc._jsc.hadoopConfiguration().set('fs.dtap.impl', 'com.bluedata.hadoop.bdfs.Bdfs')
sc._jsc.hadoopConfiguration().set('fs.AbstractFileSystem.dtap.impl', 'com.bluedata.hadoop.bdfs.BdAbstractFS')

In [4]:
text = sc.textFile("dtap://TenantStorage/HPE_Ezmeral_Learner_Guide_Final.txt")
text.take(5)

['Using HPE Ezmeral Container Platform',
 'Learner Guide',
 'Rev. 21.11',
 '',
 '\x0c\uf6d9 Copyright 2021 Hewlett Packard Enterprise Development LP']

## Creating the SparkSession

In [5]:
# initiate Spark Context
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").getOrCreate()


In [6]:
df = spark.read.csv('dtap://TenantStorage/enhanced_sur_covid_19_eng_.csv', header=True, inferSchema=True)

In [7]:
df.count() # return number of data
df.printSchema()
df.show() # only show top 20 result


root
 |-- Case no.: integer (nullable = true)
 |-- Report date: string (nullable = true)
 |-- Date of onset: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Name of hospital admitted: string (nullable = true)
 |-- Hospitalised/Discharged/Deceased: string (nullable = true)
 |-- HK/Non-HK resident: string (nullable = true)
 |-- Case classification*: string (nullable = true)
 |-- Confirmed/probable: string (nullable = true)

+--------+-----------+-------------+------+---+-------------------------+--------------------------------+------------------+--------------------+------------------+
|Case no.|Report date|Date of onset|Gender|Age|Name of hospital admitted|Hospitalised/Discharged/Deceased|HK/Non-HK resident|Case classification*|Confirmed/probable|
+--------+-----------+-------------+------+---+-------------------------+--------------------------------+------------------+--------------------+------------------+
|       1| 23/01/202

In [14]:
df.select(df["`Case no.`"], df["Gender"], df["Age"]).filter('Age = 39').filter('Gender = "M"').show()
df.filter('`Case no.` = 2000').show()
df.groupBy("Gender").mean("Age").show()

+--------+------+---+
|Case no.|Gender|Age|
+--------+------+---+
|       1|     M| 39|
|      13|     M| 39|
|     146|     M| 39|
|     308|     M| 39|
|     343|     M| 39|
|     375|     M| 39|
|     432|     M| 39|
|     487|     M| 39|
|     638|     M| 39|
|     719|     M| 39|
|     769|     M| 39|
|     877|     M| 39|
|     972|     M| 39|
|    1028|     M| 39|
|    1098|     M| 39|
|    1190|     M| 39|
|    1747|     M| 39|
|    2024|     M| 39|
|    2045|     M| 39|
|    2599|     M| 39|
+--------+------+---+
only showing top 20 rows

+--------+-----------+-------------+------+---+-------------------------+--------------------------------+------------------+--------------------+------------------+
|Case no.|Report date|Date of onset|Gender|Age|Name of hospital admitted|Hospitalised/Discharged/Deceased|HK/Non-HK resident|Case classification*|Confirmed/probable|
+--------+-----------+-------------+------+---+-------------------------+--------------------------------+--------

In [9]:
df.select(df["Case classification*"]).distinct().show(truncate = False)

+-------------------------------------------------+
|Case classification*                             |
+-------------------------------------------------+
|Local case                                       |
|Epidemiologically linked with imported case      |
|Epidemiologically linked with possibly local case|
|Epidemiologically linked with local case         |
|Imported case                                    |
|Possibly local case                              |
+-------------------------------------------------+



In [18]:
df.groupBy("Age").count().show()

+---+-----+
|Age|count|
+---+-----+
| 31|  206|
| 85|   33|
| 65|  155|
| 53|  175|
| 78|   38|
| 34|  202|
| 81|   36|
| 28|  205|
| 76|   68|
| 26|  195|
| 27|  195|
| 44|  200|
| 12|   50|
| 91|   15|
| 22|  156|
| 93|    5|
| 47|  180|
|  1|   68|
| 52|  188|
| 13|   42|
+---+-----+
only showing top 20 rows



In [61]:
from pyspark.sql.functions import round
df.select('*', (round(df.Age/10)*10).alias("Age_Group")).groupBy("Age_Group").count().sort("Age_Group").show()

+---------+-----+
|Age_Group|count|
+---------+-----+
|      0.0|  290|
|     10.0|  477|
|     20.0| 1194|
|     30.0| 2007|
|     40.0| 2143|
|     50.0| 1842|
|     60.0| 1943|
|     70.0| 1236|
|     80.0|  480|
|     90.0|  211|
|    100.0|   19|
+---------+-----+

