In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [10]:
#Total number of rows
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TotalRows").getOrCreate()
df = spark.read.format("csv").option("header", "true").load("sensors.csv")
total_rows = df.count()
print("Total de linhas: ", total_rows)

Total de linhas:  27


In [12]:
#Number of distinct sensors present on the database;
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DistinctSensors").getOrCreate()
df = spark.read.format("csv").option("header", "true").load("sensors.csv")
distinct_sensors = df.select("name").distinct().count()
print("Número total de sensores: ", distinct_sensors)


Número total de sensores:  12


In [13]:
#Number of rows for the sensor PPL340;
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SensorRows").getOrCreate()
df = spark.read.format("csv").option("header", "true").load("sensors.csv")
sensor_rows = df.filter(df["name"] == "PPL340").count()
print("Número de linhas do sensor PPL340: ", sensor_rows)

Número de linhas do sensor PPL340:  6


In [15]:
#The number of rows by year for the sensor PPL340;
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
spark = SparkSession.builder.appName("SensorRowsByYear").getOrCreate()
df = spark.read.format("csv").option("header", "true").load("sensors.csv")
df_ppl340 = df.filter(df["name"] == "PPL340")
rows_by_year = df_ppl340.groupBy("year").agg(count("*").alias("total"))
rows_by_year.show()

+----+-----+
|year|total|
+----+-----+
|2022|    4|
|2021|    2|
+----+-----+



In [43]:
#Average number of readings by year for the sensor PPL340;
from pyspark.sql.functions import avg, count
ppl340_df = df.filter(df["name"] == "PPL340")
ppl340_by_year = ppl340_df.groupBy("year").agg(count("*").alias("total"))
ppl340_avg_by_year = ppl340_by_year.agg(avg("total")).collect()[0][0]
ppl340_by_year.withColumn("Abaixo", ppl340_by_year["total"] < ppl340_avg_by_year).show()



+----+-----+------+
|year|total|Abaixo|
+----+-----+------+
|2022|    8| false|
|2021|    4|  true|
+----+-----+------+



In [57]:

#For PPL340, Identify the years in which the number of readings is less than the average
from pyspark.sql import functions as F
df = spark.read.format("csv").option("header", "true").load("sensors.csv")
# Filtra somente o sensor PPL340
ppl340_df = df.filter(df['name'] == 'PPL340')
# Agrupa os dados por ano, contando os dados
grouped_df = ppl340_df.groupBy(F.year(ppl340_df['year'])).agg(F.count('year').alias('total'))
# Calcula a média dos anos
average_count = grouped_df.agg(F.avg('total')).first()[0]
# Filtra somente os dados necessários
result_df = grouped_df.filter(grouped_df['total'] < average_count)
result_df.show()



+----------+-----+
|year(year)|total|
+----------+-----+
|      2021|   13|
+----------+-----+

