<a href="https://colab.research.google.com/github/FelipeTavares7/data_science_projects/blob/main/ETL_whit_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **ETL with PySpark**

## Extract

In [None]:
# Installing pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=657fa161313196fa96e242ae9abbd8bc8f471d8ec0c6b593b07e324ff736894e
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
# Import SparkSession
from pyspark.sql import SparkSession

In [None]:
# Create Session e app (instace)
spark = SparkSession.builder.appName('sojaSensores').getOrCreate()

In [None]:
# Read and show df
df = spark.read.format('csv').option('header', True).load('sensores-iot.csv')
df.show(5)

+---+-----------+-----------+--------+--------------------+-----------+-----------+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|
+---+-----------+-----------+--------+--------------------+-----------+-----------+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|
+---+-----------+-----------+--------+--------------------+-----------+-----------+
only showing top 5 rows



## Transform

In [None]:
# Import functions
from pyspark.sql import functions as F

In [None]:
# Create and transform column 'date'
df = df.withColumn('date', F.to_date(F.col('timestamp')))

df.show(5)

+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|      date|year|month|day|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|2004-10-08|2004|   10|  8|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|2010-06-16|2010|    6| 16|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|2010-10-03|2010|   10|  3|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|2005-10-10|2005|   10| 10|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|2020-04-29|2020|    4| 29|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
only showi

In [None]:
# Create columns 'year', 'month' and 'day'
df = (df.withColumn('year', F.year(F.col('date')))
        .withColumn('month', F.month(F.col('date')))
        .withColumn('day', F.dayofmonth(F.col('date'))))

df.show(5)

+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|      date|year|month|day|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|2004-10-08|2004|   10|  8|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|2010-06-16|2010|    6| 16|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|2010-10-03|2010|   10|  3|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|2005-10-10|2005|   10| 10|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|2020-04-29|2020|    4| 29|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
only showi

In [None]:
# Mean of temperature
mean_temp = df.select(F.mean(F.col('temperature'))).collect()[0][0]

# Mean of humidity
mean_humidity = df.select(F.mean(F.col('humidity'))).collect()[0][0]

print('Mean temperature = ', mean_temp)
print('Mean humidity = ', mean_humidity)

Mean temperature =  22.491527
Mean humidity =  55.033291


In [None]:
# Fill null data with the mean
df = df.fillna({'temperature': mean_temp, 'humidity': mean_humidity})

In [None]:
'''
Outilier detection in that case:
  When the temperature is between 0 and 40, it's ok and use the temperature, otherwise use mean_temp
'''

df = df.withColumn('temperature', F.when(F.col('temperature').between(0, 40), F.col('temperature')).otherwise(F.lit(mean_temp)))

df.show(5)

+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|      date|year|month|day|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|2004-10-08|2004|   10|  8|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|2010-06-16|2010|    6| 16|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|2010-10-03|2010|   10|  3|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|2005-10-10|2005|   10| 10|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|2020-04-29|2020|    4| 29|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
only showi