In [1]:
from utils import validate_env, setup_env, log_env
import os
import sys
from pyspark import SparkConf
from pyspark.sql import SparkSession

setup_env()
validate_env()
log_env()

SPARK_HOME = /root/uni-projects/bdm2/.venv/lib/python3.10/site-packages/pyspark
JAVA_HOME = /root/.sdkman/candidates/java/current
PATH = /root/uni-projects/bdm2/.venv/lib/python3.10/site-packages/pyspark/bin:/root/uni-projects/bdm2/.venv/bin:/root/.vscode-server/bin/dc96b837cf6bb4af9cd736aa3af08cf8279f7685/bin/remote-cli:/root/.tfenv/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/usr/lib/wsl/lib:/mnt/c/Users/Akos Schneider/.jdks/oracle-8/bin:/mnt/c/Program Files/Common Files/Oracle/Java/javapath:/mnt/c/Program Files (x86)/Common Files/Oracle/Java/javapath:/mnt/c/WINDOWS/system32:/mnt/c/WINDOWS:/mnt/c/WINDOWS/System32/Wbem:/mnt/c/WINDOWS/System32/WindowsPowerShell/v1.0/:/mnt/c/WINDOWS/System32/OpenSSH/:/mnt/c/Program Files (x86)/NVIDIA Corporation/PhysX/Common:/mnt/c/Program Files/Git/cmd:/mnt/c/Program Files/TortoiseGit/bin:/mnt/c/Users/Akos Schneider/apache-maven-3.8.6/bin:/mnt/c/Program Files/nodejs/:/mnt/c/ProgramData/chocolatey/bin:/mn

In [2]:
spark = None

In [31]:
if spark:
    spark.stop()
# Create the configuration in the local machine and give a name to the application
conf = SparkConf() \
    .set("spark.master", "local") \
    .set("spark.app.name", "Spark Dataframes Tutorial") \
    .set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.1,org.mongodb.spark:mongo-spark-connector_2.12:10.3.0")


# Create the session 
spark = SparkSession.builder \
    .config(conf=conf) \
    .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1:27017/") \
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1:27017/") \
    .getOrCreate()
print(f"Python version = {spark.sparkContext.pythonVer}")
print(f"Spark version = {spark.version}")
print(spark.sparkContext.getConf().getAll())

Python version = 3.10
Spark version = 3.5.1
[('spark.master', 'local'), ('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'), ('spark.app.id', 'local-1715867836

## Income and Idealista lookup tables

In [32]:
lookup_dir = "../data/persistent-landing-zone/lookup"
# get files
files = os.listdir(lookup_dir)
# keep only those that start with either idealista or income
files = [file for file in files if file.startswith("idealista") or file.startswith("income")]
# add the path to the files
files = [f"{lookup_dir}/{file}" for file in files]
# create a list of dataframes
dfs = []
for file in files:
    # read the file
    df = spark.read.format("avro").load(file)
    # append the dataframe to the list
    dfs.append(df)

In [33]:
for df in dfs:
    df.printSchema()

root
 |-- district: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- district_n_reconciled: string (nullable = true)
 |-- district_n: string (nullable = true)
 |-- district_id: string (nullable = true)
 |-- neighborhood_n_reconciled: string (nullable = true)
 |-- neighborhood_n: string (nullable = true)
 |-- neighborhood_id: string (nullable = true)

root
 |-- district: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- district_n_reconciled: string (nullable = true)
 |-- district_n: string (nullable = true)
 |-- district_id: string (nullable = true)
 |-- neighborhood_n_reconciled: string (nullable = true)
 |-- neighborhood_n: string (nullable = true)
 |-- neighborhood_id: string (nullable = true)



In [34]:
# union the dataframes
df = dfs[0].union(dfs[1])
df.printSchema()

root
 |-- district: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- district_n_reconciled: string (nullable = true)
 |-- district_n: string (nullable = true)
 |-- district_id: string (nullable = true)
 |-- neighborhood_n_reconciled: string (nullable = true)
 |-- neighborhood_n: string (nullable = true)
 |-- neighborhood_id: string (nullable = true)



In [35]:
# deduplicate the dataframe
print(f"Number of rows before deduplication: {df.count()}")
df = df.dropDuplicates()
print(f"Number of rows after deduplication: {df.count()}")

Number of rows before deduplication: 127
Number of rows after deduplication: 113


In [36]:
# count number of unique values in the column "neighborhood_id"
print(df.select("neighborhood_id", "district_n_reconciled", "neighborhood_n_reconciled").distinct().count())

73


In [37]:
df.count()

113

In [38]:
# drop duplicates based on "neighborhood_id", "district_n_reconciled", "neighborhood_n_reconciled"
print(f"Number of rows before deduplication: {df.count()}")
df = df.dropDuplicates(["neighborhood_id", "district_n_reconciled", "neighborhood_n_reconciled"])
print(f"Number of rows after deduplication: {df.count()}")

Number of rows before deduplication: 113
Number of rows after deduplication: 73


In [39]:
df.show()

+--------------+--------------------+---------------------+--------------+-----------+-------------------------+--------------------+---------------+
|      district|        neighborhood|district_n_reconciled|    district_n|district_id|neighborhood_n_reconciled|      neighborhood_n|neighborhood_id|
+--------------+--------------------+---------------------+--------------+-----------+-------------------------+--------------------+---------------+
|      Eixample|la Nova Esquerra ...|             Eixample|      eixample|     Q64124|     La Nova Esquerra ...|la nova esquerra ...|       Q1026658|
|    Sant Martí|la Vila Olímpica ...|           Sant Martí|    sant marti|    Q250935|     La Vila Olímpica ...|la vila olimpica ...|       Q1167101|
|    Sant Martí|         el Poblenou|           Sant Martí|    sant marti|    Q250935|              El Poblenou|         el poblenou|       Q1404773|
|   Sant Andreu|       Baró de Viver|          Sant Andreu|   sant andreu|   Q1650230|            Ba

In [40]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from utils import fix_encoding

fix_encoding_udf = udf(fix_encoding, StringType())

# for each string type column, apply the fix_encoding_udf function
for col in df.columns:
    if df.schema[col].dataType == StringType():
        df = df.withColumn(col, fix_encoding_udf(col))


In [41]:
df.show()

[Stage 38:>                                                         (0 + 1) / 1]

+--------------+--------------------+---------------------+--------------+-----------+-------------------------+--------------------+---------------+
|      district|        neighborhood|district_n_reconciled|    district_n|district_id|neighborhood_n_reconciled|      neighborhood_n|neighborhood_id|
+--------------+--------------------+---------------------+--------------+-----------+-------------------------+--------------------+---------------+
|      Eixample|la Nova Esquerra ...|             Eixample|      eixample|     Q64124|     La Nova Esquerra ...|la nova esquerra ...|       Q1026658|
|    Sant Martí|la Vila Olímpica ...|           Sant Martí|    sant marti|    Q250935|     La Vila Olímpica ...|la vila olimpica ...|       Q1167101|
|    Sant Martí|         el Poblenou|           Sant Martí|    sant marti|    Q250935|              El Poblenou|         el poblenou|       Q1404773|
|   Sant Andreu|       Baró de Viver|          Sant Andreu|   sant andreu|   Q1650230|            Ba

Error decoding: 'utf-8' codec can't decode byte 0xed in position 9: unexpected end of data. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xed in position 10: invalid continuation byte. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xed in position 9: unexpected end of data. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xed in position 10: invalid continuation byte. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xed in position 9: unexpected end of data. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xed in position 9: unexpected end of data. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xf3 in position 3: invalid continuation byte. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xf3 in position 3: invalid continuation byte. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xf3 in position 13: unexpected end of data. Trying ftfy...
Error decoding: 'utf-8

Outlier detection is not relevant here, as this is a simple lookup table.

In [42]:
df.show(5)

+--------------+--------------------+---------------------+--------------+-----------+-------------------------+--------------------+---------------+
|      district|        neighborhood|district_n_reconciled|    district_n|district_id|neighborhood_n_reconciled|      neighborhood_n|neighborhood_id|
+--------------+--------------------+---------------------+--------------+-----------+-------------------------+--------------------+---------------+
|      Eixample|la Nova Esquerra ...|             Eixample|      eixample|     Q64124|     La Nova Esquerra ...|la nova esquerra ...|       Q1026658|
|    Sant Martí|la Vila Olímpica ...|           Sant Martí|    sant marti|    Q250935|     La Vila Olímpica ...|la vila olimpica ...|       Q1167101|
|    Sant Martí|         el Poblenou|           Sant Martí|    sant marti|    Q250935|              El Poblenou|         el poblenou|       Q1404773|
|   Sant Andreu|       Baró de Viver|          Sant Andreu|   sant andreu|   Q1650230|            Ba

Error decoding: 'utf-8' codec can't decode byte 0xed in position 9: unexpected end of data. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xed in position 10: invalid continuation byte. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xed in position 9: unexpected end of data. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xed in position 10: invalid continuation byte. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xed in position 9: unexpected end of data. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xed in position 9: unexpected end of data. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xf3 in position 3: invalid continuation byte. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xf3 in position 3: invalid continuation byte. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xf3 in position 13: unexpected end of data. Trying ftfy...
Error decoding: 'utf-8

In [43]:
write_config = {
    "writeConcern.w": "majority",
    "replaceDocument": "false",
    "idFieldList": "neighborhood_id",  # Specify the field to use for upsert
    "operationType": "update",  # Specify the update operation
    "upsertDocument": "true"  # Enable upsert logic
}

df.write.format("mongodb") \
    .mode("append") \
    .option("database", "bdm") \
    .option("collection", "location-lookup") \
    .options(**write_config) \
    .save()

24/05/16 15:57:49 WARN CaseInsensitiveStringMap: Converting duplicated key idfieldlist into CaseInsensitiveStringMap.
24/05/16 15:57:49 WARN CaseInsensitiveStringMap: Converting duplicated key upsertDocument into CaseInsensitiveStringMap.
24/05/16 15:57:49 WARN CaseInsensitiveStringMap: Converting duplicated key replaceDocument into CaseInsensitiveStringMap.
24/05/16 15:57:49 WARN CaseInsensitiveStringMap: Converting duplicated key operationType into CaseInsensitiveStringMap.
24/05/16 15:57:49 WARN CaseInsensitiveStringMap: Converting duplicated key writeConcern.w into CaseInsensitiveStringMap.
Error decoding: 'utf-8' codec can't decode byte 0xed in position 9: unexpected end of data. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xed in position 10: invalid continuation byte. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xed in position 9: unexpected end of data. Trying ftfy...
Error decoding: 'utf-8' codec can't decode byte 0xed in position 10: inva

## Air quality lookup tables

In [83]:
lookup_dir = "../data/persistent-landing-zone/lookup"
# get files
files = os.listdir(lookup_dir)
# keep only those that start with a year (4 digits)
files = [file for file in files if file[:4].isdigit()]
years = [file[:4] for file in files]
# add the path to the files
files = [f"{lookup_dir}/{file}" for file in files]
# create a list of dataframes
airquality_dfs = {}
for file, year in zip(files, years):
    # read the file
    df = spark.read.format("avro").load(file)
    # append the dataframe to the list
    airquality_dfs[year] = df

In [84]:
# change all column names to lowercase
for year in airquality_dfs:
    df = airquality_dfs[year]
    for col in df.columns:
        df = df.withColumnRenamed(col, col.lower())
    airquality_dfs[year] = df

In [85]:
checked_pairs = []
# check pairwise schema equality
for year1 in airquality_dfs:
    for year2 in airquality_dfs:
        if year1 != year2 and airquality_dfs[year1].schema != airquality_dfs[year2].schema and (year1, year2) not in checked_pairs:
            checked_pairs.append((year1, year2))
            checked_pairs.append((year2, year1))
            print(f"Schema of {year1} is different from schema of {year2}")
            # print the diff
            schema1 = airquality_dfs[year1].schema
            schema2 = airquality_dfs[year2].schema

            for field1, field2 in zip(schema1, schema2):
                if field1 != field2:
                    print(f"{field1.name} != {field2.name}")
                    print(f"{field1.dataType} != {field2.dataType}")
                    print(f"{field1.nullable} != {field2.nullable}")
                    print(f"{field1.metadata} != {field2.metadata}")
                    print("\n")

Schema of 2023 is different from schema of 2018
estacio != nom_cabina
IntegerType() != StringType()
True != True
{} != {}


nom_cabina != codi_dtes
StringType() != StringType()
True != True
{} != {}


codi_dtes != zqa
StringType() != IntegerType()
True != True
{} != {}


zqa != codi_eoi
IntegerType() != IntegerType()
True != True
{} != {}


codi_eoi != longitud
IntegerType() != DoubleType()
True != True
{} != {}


longitud != latitud
DoubleType() != DoubleType()
True != True
{} != {}


latitud != ubicacio
DoubleType() != StringType()
True != True
{} != {}


ubicacio != codi_districte
StringType() != IntegerType()
True != True
{} != {}


codi_districte != nom_districte
IntegerType() != StringType()
True != True
{} != {}


nom_districte != codi_barri
StringType() != IntegerType()
True != True
{} != {}


codi_barri != nom_barri
IntegerType() != StringType()
True != True
{} != {}


nom_barri != ocupacio_sol
StringType() != StringType()
True != True
{} != {}


clas_1 != emissions_properes
S

In [86]:
# print the schema of 2018 and 2019
df_2018 = airquality_dfs["2018"]
df_2019 = airquality_dfs["2019"]

In [87]:
df_2018.printSchema()

root
 |-- nom_cabina: string (nullable = true)
 |-- codi_dtes: string (nullable = true)
 |-- zqa: integer (nullable = true)
 |-- codi_eoi: integer (nullable = true)
 |-- longitud: double (nullable = true)
 |-- latitud: double (nullable = true)
 |-- ubicacio: string (nullable = true)
 |-- codi_districte: integer (nullable = true)
 |-- nom_districte: string (nullable = true)
 |-- codi_barri: integer (nullable = true)
 |-- nom_barri: string (nullable = true)
 |-- ocupacio_sol: string (nullable = true)
 |-- emissions_properes: string (nullable = true)
 |-- contaminant_1: string (nullable = true)
 |-- contaminant_2: string (nullable = true)
 |-- contaminant_3: string (nullable = true)



In [88]:
df_2019.printSchema()

root
 |-- estacio: integer (nullable = true)
 |-- nom_cabina: string (nullable = true)
 |-- codi_dtes: string (nullable = true)
 |-- zqa: integer (nullable = true)
 |-- codi_eoi: integer (nullable = true)
 |-- longitud: double (nullable = true)
 |-- latitud: double (nullable = true)
 |-- ubicacio: string (nullable = true)
 |-- codi_districte: integer (nullable = true)
 |-- nom_districte: string (nullable = true)
 |-- codi_barri: integer (nullable = true)
 |-- nom_barri: string (nullable = true)
 |-- clas_1: string (nullable = true)
 |-- clas_2: string (nullable = true)
 |-- codi_contaminant: integer (nullable = true)



In [89]:
# get unique nom_cabina; Estacio value pairs
df_2019.select("nom_cabina", "estacio").distinct().show()

# save to a dict
cabina_estacio_2019 = df_2019.select("nom_cabina", "estacio").distinct().rdd.collectAsMap()
cabina_estacio_2019

+--------------------+-------+
|          nom_cabina|estacio|
+--------------------+-------+
|Barcelona - Eixample|     43|
|Barcelona - Palau...|     57|
|Barcelona - Poblenou|      4|
|Barcelona - Ciuta...|     50|
|   Barcelona - Sants|     42|
| Barcelona - GrÃ cia|     44|
|Barcelona - Vall ...|     54|
+--------------------+-------+



{'Barcelona - Eixample': 43,
 'Barcelona - Palau Reial': 57,
 'Barcelona - Poblenou': 4,
 'Barcelona - Ciutadella': 50,
 'Barcelona - Sants': 42,
 'Barcelona - GrÃ\xa0cia': 44,
 'Barcelona - Vall Hebron': 54}

In [90]:
# add Estacio column to 2018 dataframe
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

df_2018 = df_2018.withColumn("estacio", lit(None))

# fill in the Estacio column with the values from the dictionary
def fill_estacio(nom_cabina):
    return cabina_estacio_2019.get(nom_cabina, None)

fill_estacio_udf = udf(fill_estacio, StringType())

df_2018 = df_2018.withColumn("estacio", fill_estacio_udf("nom_cabina"))
df_2018.show()

+--------------------+---------+---+--------+--------+-------+--------------------+--------------+--------------+----------+--------------------+------------+------------------+-------------+-------------+-------------+-------+
|          nom_cabina|codi_dtes|zqa|codi_eoi|longitud|latitud|            ubicacio|codi_districte| nom_districte|codi_barri|           nom_barri|ocupacio_sol|emissions_properes|contaminant_1|contaminant_2|contaminant_3|estacio|
+--------------------+---------+---+--------+--------+-------+--------------------+--------------+--------------+----------+--------------------+------------+------------------+-------------+-------------+-------------+-------+
|Barcelona - Ciuta...|       IL|  1| 8019050|  2.1874|41.3864|Parc de la Ciutad...|             1|  Ciutat Vella|         4|Sant Pere, Santa ...|      Urbana|              Fons|          NO2|           O3|         NULL|     50|
|Barcelona - Eixample|       IH|  1| 8019043|  2.1538|41.3853|Av. Roma - c/ Com...|     

In [91]:
df_2019.show(5)

+-------+--------------------+---------+---+--------+--------+-------+--------------------+--------------+-------------+----------+--------------------+------+--------+----------------+
|estacio|          nom_cabina|codi_dtes|zqa|codi_eoi|longitud|latitud|            ubicacio|codi_districte|nom_districte|codi_barri|           nom_barri|clas_1|  clas_2|codi_contaminant|
+-------+--------------------+---------+---+--------+--------+-------+--------------------+--------------+-------------+----------+--------------------+------+--------+----------------+
|     50|Barcelona - Ciuta...|       IL|  1| 8019050|  2.1874|41.3864|Parc de la Ciutad...|             1| Ciutat Vella|         4|Sant Pere, Santa ...|Urbana|    Fons|               8|
|     50|Barcelona - Ciuta...|       IL|  1| 8019050|  2.1874|41.3864|Parc de la Ciutad...|             1| Ciutat Vella|         4|Sant Pere, Santa ...|Urbana|    Fons|              14|
|     50|Barcelona - Ciuta...|       IL|  1| 8019050|  2.1874|41.3864|

ocupacio_sol == clas_1

emissions_properes == clas_2

codi_contaminant in 2018, the rest contains:
- contaminant_1: string (nullable = true)
- contaminant_2: string (nullable = true)
- contaminant_3: string (nullable = true)

information might be valuable from 2018, so we will store it in a separate collection, however incoming new data will be stored in the same collection along with the rest of the data from 2019.

In [92]:
# filter out from all dfs where codi_districte is not in the range of 1 and 10, and codi_barri is not in the range of 1 and 73
for year in airquality_dfs:
    df = airquality_dfs[year]
    print(f"Number of rows for year {year} before filtering: {df.count()}")
    df = df.filter(df['codi_districte'].between(1, 10) & df['codi_barri'].between(1, 73))
    print(f"Number of rows for year {year} after filtering: {df.count()}")
    airquality_dfs[year] = df

Number of rows for year 2023 before filtering: 54
Number of rows for year 2023 after filtering: 54
Number of rows for year 2022 before filtering: 44
Number of rows for year 2022 after filtering: 44
Number of rows for year 2018 before filtering: 7
Number of rows for year 2018 after filtering: 7
Number of rows for year 2021 before filtering: 50
Number of rows for year 2021 after filtering: 50
Number of rows for year 2019 before filtering: 39
Number of rows for year 2019 after filtering: 39


In [93]:
# find nulls
for col in df_2018.columns:
    print(f"Number of nulls in column {col}: {df_2018.filter(df_2018[col].isNull()).count()}")

Number of nulls in column nom_cabina: 0
Number of nulls in column codi_dtes: 0
Number of nulls in column zqa: 0
Number of nulls in column codi_eoi: 0
Number of nulls in column longitud: 0
Number of nulls in column latitud: 0
Number of nulls in column ubicacio: 0
Number of nulls in column codi_districte: 0
Number of nulls in column nom_districte: 0
Number of nulls in column codi_barri: 0
Number of nulls in column nom_barri: 0
Number of nulls in column ocupacio_sol: 0
Number of nulls in column emissions_properes: 0
Number of nulls in column contaminant_1: 0
Number of nulls in column contaminant_2: 2
Number of nulls in column contaminant_3: 2
Number of nulls in column estacio: 0


In [94]:
# for each string type column, apply the fix_encoding_udf function
for col in df_2018.columns:
    if df_2018.schema[col].dataType == StringType():
        df_2018 = df_2018.withColumn(col, fix_encoding_udf(col))

In [95]:
df_2018.show(5)

+--------------------+---------+---+--------+--------+-------+--------------------+--------------+-------------+----------+--------------------+------------+------------------+-------------+-------------+-------------+-------+
|          nom_cabina|codi_dtes|zqa|codi_eoi|longitud|latitud|            ubicacio|codi_districte|nom_districte|codi_barri|           nom_barri|ocupacio_sol|emissions_properes|contaminant_1|contaminant_2|contaminant_3|estacio|
+--------------------+---------+---+--------+--------+-------+--------------------+--------------+-------------+----------+--------------------+------------+------------------+-------------+-------------+-------------+-------+
|Barcelona - Ciuta...|       IL|  1| 8019050|  2.1874|41.3864|Parc de la Ciutad...|             1| Ciutat Vella|         4|Sant Pere, Santa ...|      Urbana|              Fons|          NO2|           O3|         NULL|     50|
|Barcelona - Eixample|       IH|  1| 8019043|  2.1538|41.3853|Av. Roma - c/ Com...|         

Error decoding: 'int' object has no attribute 'encode'. Trying ftfy...
ftfy failed: object of type 'int' has no len(). Returning original string.
Error decoding: 'int' object has no attribute 'encode'. Trying ftfy...
ftfy failed: object of type 'int' has no len(). Returning original string.
Error decoding: 'int' object has no attribute 'encode'. Trying ftfy...
ftfy failed: object of type 'int' has no len(). Returning original string.
Error decoding: 'int' object has no attribute 'encode'. Trying ftfy...
ftfy failed: object of type 'int' has no len(). Returning original string.
Error decoding: 'int' object has no attribute 'encode'. Trying ftfy...
ftfy failed: object of type 'int' has no len(). Returning original string.
Error decoding: 'int' object has no attribute 'encode'. Trying ftfy...
ftfy failed: object of type 'int' has no len(). Returning original string.


In [96]:
df_2018.write.format("mongodb") \
    .mode("overwrite") \
    .option("database", "bdm") \
    .option("collection", "airquality-lookup-2018") \
    .save()

Error decoding: 'int' object has no attribute 'encode'. Trying ftfy...
ftfy failed: object of type 'int' has no len(). Returning original string.
Error decoding: 'int' object has no attribute 'encode'. Trying ftfy...
ftfy failed: object of type 'int' has no len(). Returning original string.
Error decoding: 'int' object has no attribute 'encode'. Trying ftfy...
ftfy failed: object of type 'int' has no len(). Returning original string.
Error decoding: 'int' object has no attribute 'encode'. Trying ftfy...
ftfy failed: object of type 'int' has no len(). Returning original string.
Error decoding: 'int' object has no attribute 'encode'. Trying ftfy...
ftfy failed: object of type 'int' has no len(). Returning original string.
Error decoding: 'int' object has no attribute 'encode'. Trying ftfy...
ftfy failed: object of type 'int' has no len(). Returning original string.
Error decoding: 'int' object has no attribute 'encode'. Trying ftfy...
ftfy failed: object of type 'int' has no len(). Retur

In [97]:
# remove 2018 from the dictionary
airquality_dfs.pop("2018")

DataFrame[nom_cabina: string, codi_dtes: string, zqa: int, codi_eoi: int, longitud: double, latitud: double, ubicacio: string, codi_districte: int, nom_districte: string, codi_barri: int, nom_barri: string, ocupacio_sol: string, emissions_properes: string, contaminant_1: string, contaminant_2: string, contaminant_3: string]

In [98]:
# add a year column to each dataframe
for year in airquality_dfs:
    df = airquality_dfs[year]
    df = df.withColumn("year", lit(int(year)))
    airquality_dfs[year] = df

In [99]:
airquality_dfs['2019'].show(3)

+-------+--------------------+---------+---+--------+--------+-------+--------------------+--------------+-------------+----------+--------------------+------+------+----------------+----+
|estacio|          nom_cabina|codi_dtes|zqa|codi_eoi|longitud|latitud|            ubicacio|codi_districte|nom_districte|codi_barri|           nom_barri|clas_1|clas_2|codi_contaminant|year|
+-------+--------------------+---------+---+--------+--------+-------+--------------------+--------------+-------------+----------+--------------------+------+------+----------------+----+
|     50|Barcelona - Ciuta...|       IL|  1| 8019050|  2.1874|41.3864|Parc de la Ciutad...|             1| Ciutat Vella|         4|Sant Pere, Santa ...|Urbana|  Fons|               8|2019|
|     50|Barcelona - Ciuta...|       IL|  1| 8019050|  2.1874|41.3864|Parc de la Ciutad...|             1| Ciutat Vella|         4|Sant Pere, Santa ...|Urbana|  Fons|              14|2019|
|     50|Barcelona - Ciuta...|       IL|  1| 8019050|  

In [100]:
# merge all dataframes
airqual_lookup_from_2019_df = None
for year in airquality_dfs:
    if airqual_lookup_from_2019_df is None:
        airqual_lookup_from_2019_df = airquality_dfs[year]
    else:
        airqual_lookup_from_2019_df = airqual_lookup_from_2019_df.union(airquality_dfs[year])

In [101]:
sum = 0
for year in airquality_dfs:
    print(airquality_dfs[year].count())
    sum += airquality_dfs[year].count()
print("Sum:", sum)

54
44
50
39
Sum: 187


In [102]:
airqual_lookup_from_2019_df.count()

187

In [103]:
# deduplicate the dataframe
print(f"Number of rows before deduplication: {airqual_lookup_from_2019_df.count()}")
airqual_lookup_from_2019_df = airqual_lookup_from_2019_df.dropDuplicates()
print(f"Number of rows after deduplication: {airqual_lookup_from_2019_df.count()}")

Number of rows before deduplication: 187
Number of rows after deduplication: 187


In [104]:
airqual_lookup_from_2019_df.show()

+-------+--------------------+---------+---+--------+--------+-------+--------------------+--------------+--------------+----------+--------------------+------+--------+----------------+----+
|estacio|          nom_cabina|codi_dtes|zqa|codi_eoi|longitud|latitud|            ubicacio|codi_districte| nom_districte|codi_barri|           nom_barri|clas_1|  clas_2|codi_contaminant|year|
+-------+--------------------+---------+---+--------+--------+-------+--------------------+--------------+--------------+----------+--------------------+------+--------+----------------+----+
|     43|Barcelona - Eixample|       IH|  1| 8019043|  2.1538|41.3853|Av. Roma - c/ Com...|             5|      Eixample|         9|la Nova Esquerra ...|Urbana|TrÃ nsit|              12|2023|
|     50|Barcelona - Ciuta...|       IL|  1| 8019050|  2.1874|41.3864|Parc de la Ciutad...|             1|  Ciutat Vella|         4|Sant Pere, Santa ...|Urbana|    Fons|               7|2023|
|     44| Barcelona - GrÃ cia|       IJ|

In [105]:
# for each string type column, apply the fix_encoding_udf function
for col in airqual_lookup_from_2019_df.columns:
    if airqual_lookup_from_2019_df.schema[col].dataType == StringType():
        airqual_lookup_from_2019_df = airqual_lookup_from_2019_df.withColumn(col, fix_encoding_udf(col))

In [106]:
airqual_lookup_from_2019_df.show(10)

+-------+--------------------+---------+---+--------+--------+-------+--------------------+--------------+-------------+----------+--------------------+------+-------+----------------+----+
|estacio|          nom_cabina|codi_dtes|zqa|codi_eoi|longitud|latitud|            ubicacio|codi_districte|nom_districte|codi_barri|           nom_barri|clas_1| clas_2|codi_contaminant|year|
+-------+--------------------+---------+---+--------+--------+-------+--------------------+--------------+-------------+----------+--------------------+------+-------+----------------+----+
|     43|Barcelona - Eixample|       IH|  1| 8019043|  2.1538|41.3853|Av. Roma - c/ Com...|             5|     Eixample|         9|la Nova Esquerra ...|Urbana|Trànsit|              12|2023|
|     50|Barcelona - Ciuta...|       IL|  1| 8019050|  2.1874|41.3864|Parc de la Ciutad...|             1| Ciutat Vella|         4|Sant Pere, Santa ...|Urbana|   Fons|               7|2023|
|     44|  Barcelona - Gràcia|       IJ|  1| 80190

In [107]:
airqual_lookup_from_2019_df.printSchema()

root
 |-- estacio: integer (nullable = true)
 |-- nom_cabina: string (nullable = true)
 |-- codi_dtes: string (nullable = true)
 |-- zqa: integer (nullable = true)
 |-- codi_eoi: integer (nullable = true)
 |-- longitud: double (nullable = true)
 |-- latitud: double (nullable = true)
 |-- ubicacio: string (nullable = true)
 |-- codi_districte: integer (nullable = true)
 |-- nom_districte: string (nullable = true)
 |-- codi_barri: integer (nullable = true)
 |-- nom_barri: string (nullable = true)
 |-- clas_1: string (nullable = true)
 |-- clas_2: string (nullable = true)
 |-- codi_contaminant: integer (nullable = true)
 |-- year: integer (nullable = false)



In [108]:
# find nulls
for col in airqual_lookup_from_2019_df.columns:
    print(f"Number of nulls in column {col}: {airqual_lookup_from_2019_df.filter(airqual_lookup_from_2019_df[col].isNull()).count()}")

Number of nulls in column estacio: 0


                                                                                

Number of nulls in column nom_cabina: 0
Number of nulls in column codi_dtes: 0
Number of nulls in column zqa: 0
Number of nulls in column codi_eoi: 0
Number of nulls in column longitud: 0
Number of nulls in column latitud: 0
Number of nulls in column ubicacio: 0
Number of nulls in column codi_districte: 0
Number of nulls in column nom_districte: 0
Number of nulls in column codi_barri: 0
Number of nulls in column nom_barri: 0
Number of nulls in column clas_1: 0
Number of nulls in column clas_2: 0
Number of nulls in column codi_contaminant: 0
Number of nulls in column year: 0


In [110]:
airqual_lookup_from_2019_df.write.format("mongodb") \
    .mode("overwrite") \
    .option("database", "bdm") \
    .option("collection", "airquality-lookup-after-2018") \
    .save()