# **Data Formatting Pipeline**

## Demography

In [1]:
import pyspark
from pyspark.sql import SparkSession
import duckdb
!wget -O "duckdb.jar" "https://repo1.maven.org/maven2/org/duckdb/duckdb_jdbc/0.10.1/duckdb_jdbc-0.10.1.jar"

# # Inicializar SparkSession
# spark = SparkSession.builder \
#     .appName("Parquet to DuckDB") \
#     .getOrCreate()

# spark = SparkSession.builder \
#     .appName("Parquet to DuckDB") \
#     .master("local[*]") \
#     .getOrCreate()

spark = SparkSession.builder\
    .appName("Parquet to DuckDB") \
    .master("local[*]") \
    .config("spark.jars", "duckdb.jar") \
    .getOrCreate()

# Leer el archivo Parquet
demography_df = spark.read.parquet('../data/demography.parquet')
mh_df = spark.read.parquet('../data/mental_health.parquet')
society_df = spark.read.parquet('../data/society.parquet')

ds = [demography_df, mh_df, society_df]

# Mostrar los datos para verificar
for d in ds:
    d.show()

24/04/10 12:42:46 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 172.20.10.7 instead (on interface wlp0s20f3)
24/04/10 12:42:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/04/10 12:42:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


+--------------------+----+--------+-----+-----+---------+-----+----+------+------+------+--------+---------------------------------+----+
|                NAME|  YR|AREA_KM2|  CBR|  CDR|   DEATHS|   E0| GRR|   IMR|MEDAGE| MR0_4|POP_DENS|genc standard countries and areas|code|
+--------------------+----+--------+-----+-----+---------+-----+----+------+------+------+--------+---------------------------------+----+
|             Andorra|1990|     468|11.91| 5.57|    294.0|79.48|0.61|  5.85|  32.8|   7.5|   112.7|                             1990|  AD|
|United Arab Emirates|1990|   83600| NULL| NULL|     NULL| NULL|NULL|  NULL|  NULL|  NULL|    21.5|                             1990|  AE|
|         Afghanistan|1990|  652230|54.44| 22.5| 305242.0|42.19| 3.9|167.73|  17.2| 251.3|    20.8|                             1990|  AF|
| Antigua and Barbuda|1990|     443| NULL| NULL|     NULL| NULL|NULL|  NULL|  NULL|  NULL|   144.8|                             1990|  AG|
|            Anguilla|1990|

24/04/10 12:42:51 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+----+------------------------+----------------+------------------------------------+--------------------------+----------------------------+---------------------------+-----------------------------+-------------------------+---------------------------+--------------------------+----------------------------+-------------------------------+-----------------------------+--------------------------------+------------------------------+--------------------+-------------------------+-----------------------------------------+-------------------------+---------------------------+-------------------------+---------------------------+-------------------------+---------------------------+------------------------------------------------------------------------+--------------------------------------------------+----------------------------------+------------------------------------------+------------------------------------------+-------------------------------------------------------------

### Save Basic

In [2]:
d = {
    demography_df:"demography_df",
    mh_df:"mh_df",
    society_df:'society_df'
}

for k,v in d.items():
    k.write \
        .format("jdbc") \
        .option("url", f"jdbc:duckdb:{v}.db") \
        .option("dbtable", f"{v}") \
        .option("driver", "org.duckdb.DuckDBDriver") \
        .save()

                                                                                

### Read Bàsic

In [7]:
DEMOGRAPHY = spark.read \
  .format("jdbc") \
  .option("url", "jdbc:duckdb:demography_df.db") \
  .option("driver", "org.duckdb.DuckDBDriver") \
  .option("dbtable","demography_df") \
  .load()

MH = spark.read \
  .format("jdbc") \
  .option("url", "jdbc:duckdb:mh_df.db") \
  .option("driver", "org.duckdb.DuckDBDriver") \
  .option("dbtable","mh_df") \
  .load()

SOCIETY = spark.read \
  .format("jdbc") \
  .option("url", "jdbc:duckdb:society_df.db") \
  .option("driver", "org.duckdb.DuckDBDriver") \
  .option("dbtable","society_df") \
  .load()

DEMOGRAPHY.show()
MH.show()
SOCIETY.show()

+--------------------+----+--------+-----+-----+---------+-----+----+------+------+------+--------+---------------------------------+----+
|                NAME|  YR|AREA_KM2|  CBR|  CDR|   DEATHS|   E0| GRR|   IMR|MEDAGE| MR0_4|POP_DENS|genc standard countries and areas|code|
+--------------------+----+--------+-----+-----+---------+-----+----+------+------+------+--------+---------------------------------+----+
|             Andorra|1990|     468|11.91| 5.57|    294.0|79.48|0.61|  5.85|  32.8|   7.5|   112.7|                             1990|  AD|
|United Arab Emirates|1990|   83600| NULL| NULL|     NULL| NULL|NULL|  NULL|  NULL|  NULL|    21.5|                             1990|  AE|
|         Afghanistan|1990|  652230|54.44| 22.5| 305242.0|42.19| 3.9|167.73|  17.2| 251.3|    20.8|                             1990|  AF|
| Antigua and Barbuda|1990|     443| NULL| NULL|     NULL| NULL|NULL|  NULL|  NULL|  NULL|   144.8|                             1990|  AG|
|            Anguilla|1990|

In [33]:
import duckdb

# Convertir el DataFrame de Spark a Pandas
demography_pd_df = demography_df.toPandas()
mh_pd_df = mh_df.toPandas()
society_pd_df = society_df.toPandas()

dataframes = [demography_pd_df, mh_pd_df, society_pd_df]
views = ['demography_view','mh_view','society_view']


# Registrar el DataFrame de Pandas como una vista en DuckDB
for view, df in zip(views, dataframes):

    # Conectar a DuckDB
    conn = duckdb.connect(database=':memory:', read_only=False)
    conn.register(view, df)
    conn.execute(f"CREATE TABLE demography AS SELECT * FROM {view}")

    # Consultar datos en DuckDB (ejemplo)
    result = conn.execute(f"SELECT * FROM {view} LIMIT 5").fetchall()
    print(result)

    result = conn.execute(f"""
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_name = '{view}'
        """).fetchall()

    # Imprimir los resultados
    for column_name, data_type in result:
        print(f"Columna: {column_name}, Tipo: {data_type}")

    # Cerrar la conexión a DuckDB
    conn.close()

[('Andorra', 1990, 468, 11.91, 5.57, 294.0, 79.48, 0.61, 5.85, 32.8, 7.5, 112.7, 1990, 'AD'), ('United Arab Emirates', 1990, 83600, None, None, None, None, None, None, None, None, 21.5, 1990, 'AE'), ('Afghanistan', 1990, 652230, 54.44, 22.5, 305242.0, 42.19, 3.9, 167.73, 17.2, 251.3, 20.8, 1990, 'AF'), ('Antigua and Barbuda', 1990, 443, None, None, None, None, None, None, None, None, 144.8, 1990, 'AG'), ('Anguilla', 1990, 91, 18.91, 6.42, 54.0, 76.17, 1.08, 11.09, 24.9, 13.6, 92.4, 1990, 'AI')]
Columna: NAME, Tipo: VARCHAR
Columna: YR, Tipo: BIGINT
Columna: AREA_KM2, Tipo: BIGINT
Columna: CBR, Tipo: DOUBLE
Columna: CDR, Tipo: DOUBLE
Columna: DEATHS, Tipo: DOUBLE
Columna: E0, Tipo: DOUBLE
Columna: GRR, Tipo: DOUBLE
Columna: IMR, Tipo: DOUBLE
Columna: MEDAGE, Tipo: DOUBLE
Columna: MR0_4, Tipo: DOUBLE
Columna: POP_DENS, Tipo: DOUBLE
Columna: genc standard countries and areas, Tipo: BIGINT
Columna: code, Tipo: VARCHAR
[(0, 'Afghanistan', 'AFG', '1990', '0.16056', '0.697779', '0.101855', '4

In [24]:
def explore(db):
    # Convertir el DataFrame de Spark a Pandas
    db = db.toPandas()

    # Conectar a DuckDB
    conn = duckdb.connect(database=':memory:', read_only=False)

    # Registrar el DataFrame de Pandas como una vista en DuckDB
    conn.register('db', db)

    # Crear una tabla en DuckDB desde el DataFrame de Pandas
    conn.execute("CREATE TABLE demography AS SELECT * FROM db")

    # Asumiendo que la tabla 'demography' ya existe en tu base de datos DuckDB
    # Obtener información de los tipos de columna
    result = conn.execute("""
    SELECT column_name, data_type
    FROM information_schema.columns
    WHERE table_name = 'db'
    """).fetchall()

    # Imprimir los resultados
    for column_name, data_type in result:
        print(f"Columna: {column_name}, Tipo: {data_type}")

    # Cerrar la conexión a DuckDB
    conn.close()

In [32]:
explore(society_df)

Columna: Country, Tipo: VARCHAR
Columna: Year, Tipo: BIGINT
Columna: Area (square kilometres), Tipo: DOUBLE
Columna: Total population, Tipo: DOUBLE
Columna: Population density, pers. per sq. km, Tipo: DOUBLE
Columna: Population aged 0-14, male, Tipo: DOUBLE
Columna: Population aged 0-14, female, Tipo: DOUBLE
Columna: Population aged 15-64, male, Tipo: DOUBLE
Columna: Population aged 15-64, female, Tipo: DOUBLE
Columna: Population aged 64+, male, Tipo: DOUBLE
Columna: Population aged 64+, female, Tipo: DOUBLE
Columna: Total population, male (%), Tipo: DOUBLE
Columna: Total population, female (%), Tipo: DOUBLE
Columna: Life expectancy at birth, women, Tipo: DOUBLE
Columna: Life expectancy at birth, men, Tipo: DOUBLE
Columna: Life expectancy at age 65, women, Tipo: DOUBLE
Columna: Life expectancy at age 65, men, Tipo: DOUBLE
Columna: Total fertility rate, Tipo: DOUBLE
Columna: Adolescent fertility rate, Tipo: DOUBLE
Columna: Mean age of women at birth of first child, Tipo: DOUBLE
Columna: