In [1]:
import findspark

In [2]:
findspark.init('/home/tosin/spark-3.0.1-bin-hadoop2.7')

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('Alpha').getOrCreate()

In [6]:
data = spark.read.json('/home/tosin/Documents/Dataset/people.json')

In [7]:
data.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [8]:
#Run Schema

data.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [9]:
#describe data

data.columns

['age', 'name']

In [10]:
data.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [11]:
water_df = spark.read.format('csv').option('header', 'True').load('/home/tosin/Documents/Dataset/effluent_data.csv')

In [12]:
water_df.show()

+---------------+--------------+---------------+------------------+--------------------+--------------------+--------------------+---------------+----+-----+
|SampleDateTime0|TreatmentPlant|SampleDateTime2|SampleLocationName|         SampleValue| NameDeterminandName|           UnitsName|BelowMinReading|Year|Month|
+---------------+--------------+---------------+------------------+--------------------+--------------------+--------------------+---------------+----+-----+
|     2015-11-19| Driffield STW|     2015-11-19|Treatment Effluent|0.013999999999999999|           triclosan|micrograms per litre|            Yes|2015|   11|
|     2015-04-06|Pen-y-bont STW|     2015-04-06|Treatment Effluent|              0.0005|              BDE 99|micrograms per litre|            Yes|2015|    4|
|     2015-04-06|Pen-y-bont STW|     2015-04-06|Treatment Effluent|              0.0005|             BDE 100|micrograms per litre|            Yes|2015|    4|
|     2015-04-06|Pen-y-bont STW|     2015-04-06|Trea

In [13]:
water_df.printSchema()

root
 |-- SampleDateTime0: string (nullable = true)
 |-- TreatmentPlant: string (nullable = true)
 |-- SampleDateTime2: string (nullable = true)
 |-- SampleLocationName: string (nullable = true)
 |-- SampleValue: string (nullable = true)
 |-- NameDeterminandName: string (nullable = true)
 |-- UnitsName: string (nullable = true)
 |-- BelowMinReading: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)



In [14]:
#edit the schemas appropriately

from pyspark.sql.types import StructField, StringType, IntegerType, StructType, DateType, FloatType

In [15]:
data_schema = [StructField('SampleDateTime0', DateType(), True),
              StructField('SampleDateTime2', DateType(), True),
              StructField('SampleValue', FloatType(), True),
              StructField('Month', IntegerType(), True)]

In [16]:
final_struc = StructType(fields=data_schema)

In [17]:
water_df = spark.read.csv('/home/tosin/Documents/Dataset/effluent_data.csv', header=True, inferSchema=True)

In [18]:
#water_df.printSchema()

In [19]:
water_df.dtypes


[('SampleDateTime0', 'string'),
 ('TreatmentPlant', 'string'),
 ('SampleDateTime2', 'string'),
 ('SampleLocationName', 'string'),
 ('SampleValue', 'double'),
 ('NameDeterminandName', 'string'),
 ('UnitsName', 'string'),
 ('BelowMinReading', 'string'),
 ('Year', 'int'),
 ('Month', 'int')]

In [20]:
#water_df[water_df.NameDeterminandName == 'triclosan'].show()

### Quick Exercise on Spark Dataframe

In [46]:
# Groupd dataset by Year
water_df.groupBy('Month').count().orderBy('count').show()  

+-----+-----+
|Month|count|
+-----+-----+
|   12|35325|
|    4|48810|
|    2|49474|
|    5|51861|
|    3|52973|
|    8|53602|
|    6|54137|
|   10|54933|
|    1|56030|
|    7|57147|
|    9|59533|
|   11|64298|
+-----+-----+



In [41]:
# Groupd dataset by Units
water_df.groupBy('UnitsName').count().orderBy('count').show()  

+--------------------+------+
|           UnitsName| count|
+--------------------+------+
|             Not Set|  1165|
|milligrammes per ...|134397|
|micrograms per litre|502561|
+--------------------+------+



In [40]:
# Groupd dataset by 'below min reading'
water_df.groupBy('BelowMinReading').count().orderBy('count').show()  

+---------------+------+
|BelowMinReading| count|
+---------------+------+
|            Yes|190959|
|             No|447164|
+---------------+------+



In [35]:
# Groupd dataset by Determinants
water_df.groupBy('NameDeterminandName').count().orderBy('count', ascending=False).show() 

#top

+--------------------+-----+
| NameDeterminandName|count|
+--------------------+-----+
|    nickel dissolved|12513|
|total suspended s...|12379|
|             calcium|12376|
|        nickel total|12376|
|          iron total|12376|
|        copper total|12375|
|dissolved organic...|12374|
|       cadmium total|12374|
|   cadmium dissolved|12370|
|          lead total|12370|
|      chromium total|12370|
|     aluminium total|12365|
|total organic carbon|12365|
|      iron dissolved|12364|
|          zinc total|12363|
|           triclosan|12363|
|    copper dissolved|12357|
| aluminium dissolved|12352|
|  aluminium reactive|12351|
|                PFOS|12347|
+--------------------+-----+
only showing top 20 rows



In [38]:
# Groupd dataset by treatment Plants
water_df.groupBy('TreatmentPlant').count().orderBy('count', ascending=False).show(200)  

+--------------------+-----+
|      TreatmentPlant|count|
+--------------------+-----+
|        Sherburn STW| 2414|
|       Rye Meads STW| 2232|
|          Hurley STW| 2071|
|         Birtley STW| 2005|
|          Marlow STW| 2002|
|       Melbourne STW| 1998|
|         Farnham STW| 1925|
|       Aldershot STW| 1925|
|          Battle STW| 1861|
|          Huyton STW| 1851|
|           Luton STW| 1848|
|         Reading STW| 1848|
|       Nettleham STW| 1844|
|        Oldhurst STW| 1837|
|       Tuddenham STW| 1798|
|           Wells STW| 1797|
|       Witchford STW| 1797|
|       Alconbury STW| 1796|
|      Swineshead STW| 1796|
|       Rackheath STW| 1795|
|            Acle STW| 1792|
|        Hanslope STW| 1786|
|      Waterbeach STW| 1767|
|         Duxford STW| 1748|
|    Uttons Drove STW| 1748|
|          Barrow STW| 1747|
|      Dullingham STW| 1747|
|        Frampton STW| 1729|
|      Callington STW| 1698|
|            Looe STW| 1655|
|     Horsham New STW| 1641|
|       Brough

In [57]:
determinants = water_df.groupBy('NameDeterminandName')

In [64]:
determinants.agg({'SampleValue': 'mean'}).collect()

[Row(NameDeterminandName='carbamazepine', avg(SampleValue)=0.6435719211822668),
 Row(NameDeterminandName='aluminium total', avg(SampleValue)=99.5387840679338),
 Row(NameDeterminandName='cypermethrin', avg(SampleValue)=0.001329596052610153),
 Row(NameDeterminandName='total phosphorus as P', avg(SampleValue)=2.8798776753670476),
 Row(NameDeterminandName='Biochemical Oxygen Demand', avg(SampleValue)=5.0058816686056185),
 Row(NameDeterminandName='tolyltriazole', avg(SampleValue)=1.6513827667984187),
 Row(NameDeterminandName='total suspended solids', avg(SampleValue)=10.668297923903367),
 Row(NameDeterminandName='fluoranthene', avg(SampleValue)=0.01392646653696515),
 Row(NameDeterminandName='copper dissolved', avg(SampleValue)=6.984541312616331),
 Row(NameDeterminandName='PFOS', avg(SampleValue)=0.034164214700736883),
 Row(NameDeterminandName='diclofenac', avg(SampleValue)=0.32460956607495084),
 Row(NameDeterminandName='soluble reactive phosphate as P', avg(SampleValue)=2.4206637587726463),

In [59]:
type (determinants)

pyspark.sql.group.GroupedData