In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=10bde17c0f644e0935122cd15b089d0f2664294c2fd42155507b61e5030867c2
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('sojaSensores').getOrCreate()

In [7]:
df = spark.read.format('csv').option('header', True).load('sensores-iot.csv')

In [8]:
df.show()

+---+-----------+-----------+--------+--------------------+-----------+-----------+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|
+---+-----------+-----------+--------+--------------------+-----------+-----------+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|
|  5|sensor-2821|         15|      52|2000-10-27 23:32:...|-55.3025525| -26.657210|
|  6|sensor-9805|         12|      37|2022-08-09 22:08:...| -25.912786| 122.693490|
|  7|sensor-1230|         10|      60|1970-02-02 17:32:...|-88.9300035|  33.377804|
|  8|sensor-4472|         11|      75|2014-02-22 06:42:...|-89.0426855| 120.

In [27]:
from pyspark.sql import functions as F

In [14]:
df = df.withColumn('date', F.to_date(F.col('timestamp')))

In [28]:
df = (df.withColumn('year', F.year(F.col('date')))
        .withColumn('month', F.month(F.col('date')))
        .withColumn('day', F.dayofmonth(F.col('date'))))

In [35]:
mean_temp = df.select(F.mean(F.col("temperature"))).collect()[0][0]
mean_humidity = df.select(F.mean(F.col("humidity"))).collect()[0][0]

print("Mean temp -> ", mean_temp)
print("Mean humidity -> ", mean_humidity)

Mean temp ->  22.491527
Mean humidity ->  55.033291


In [37]:
df = df.fillna({'temperature': mean_temp, 'humidity': mean_humidity})

In [38]:
df.count()

1000000

In [42]:
df = df.withColumn('temperature', F.when(F.col('temperature').between(0, 40), F.col('temperature')).otherwise(F.lit(mean_temp)))

+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|      date|year|month|day|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|2004-10-08|2004|   10|  8|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|2010-06-16|2010|    6| 16|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|2010-10-03|2010|   10|  3|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|2005-10-10|2005|   10| 10|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|2020-04-29|2020|    4| 29|
|  5|sensor-2821|         15|      52|2000-10-27 23:32:...|-55.3025525| -26.657210|2000-10-27|2000|   10| 27|
|  6|senso

In [None]:
# se temperatura estiver entre 0 e 40, ta ok, usa a temperatura, senao usa mean_temp

In [43]:
df.write.parquet('parquet_tradicional_sensores.parquet')

In [44]:
df.write.option('compression', 'snappy').parquet('parquet_snappy_sensores.parquet')

In [45]:
df.write.option('compression', 'gzip').parquet('parquet_gzip_sensores.parquet')

In [48]:
df.write.partitionBy('year', 'month').parquet('parquet_year_month_sensores.parquet')

In [46]:
spark.read.parquet('parquet_tradicional_sensores.parquet').show(5)
spark.read.parquet('parquet_snappy_sensores.parquet').show(5)
spark.read.parquet('parquet_gzip_sensores.parquet').show(5)

+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|      date|year|month|day|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|2004-10-08|2004|   10|  8|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|2010-06-16|2010|    6| 16|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|2010-10-03|2010|   10|  3|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|2005-10-10|2005|   10| 10|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|2020-04-29|2020|    4| 29|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
only showi