# PySpark joinak


In [48]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round
from functools import reduce

# Initialize Spark session
spark = SparkSession.builder.appName("Join DataFrames").getOrCreate()
sc = spark.sparkContext

In [49]:
# Dataframe eta textu fitxategiak kargatu
df_fisica = spark.read.option("header", True).csv("txt/notas_fisica.txt")

df_mates = spark.read.option("header", True).csv("txt/notas_mates.txt")
df_ingles = spark.read.option("header", True).csv("txt/notas_ingles.txt")

In [50]:
# Add header 
df_fisica = df_fisica.toDF("izena","notas_fisica")
df_mates = df_mates.toDF("izena","notas_mates")
df_ingles = df_ingles.toDF("izena","notas_ingles")


In [51]:
# Duplikatuak kendu
df_fisica = df_fisica.dropDuplicates()
df_mate = df_mate.dropDuplicates()
df_ingles = df_ingles.dropDuplicates()

In [52]:
# Erakutsi the DataFrame-ak
print("Fisica DataFrame:")
df_fisica.show()

print("Mate DataFrame:")
df_mates.show()

print("Ingles DataFrame:")
df_ingles.show()

Fisica DataFrame:
+---------+------------+
|    izena|notas_fisica|
+---------+------------+
|Jose Juan|           3|
|   Carlos|           4|
|  Nicolas|           7|
|Alejandro|           3|
|    Rocio|           5|
| Fernando|           9|
|   Isabel|           8|
|   Andres|           4|
|    Pedro|           2|
|    Oscar|           5|
| Leonardo|           6|
|    Rocio|           7|
|    Maria|           3|
|   Triana|           3|
|    Jorge|           5|
|    Ramon|           7|
|   Anabel|           2|
|   Susana|           9|
|     Rosa|           8|
+---------+------------+

Mate DataFrame:
+---------+-----------+
|    izena|notas_mates|
+---------+-----------+
|    Maria|          2|
|    Ramon|        4.5|
|    Jorge|         10|
|   Susana|          9|
|   Anabel|          8|
|    Pedro|          5|
|    Rocio|          6|
|   Carlos|          4|
|   Triana|          3|
|   Andres|          4|
| Fernando|          5|
| Leonardo|          1|
|    Oscar|          7|
|   Is

In [53]:
# Join
# full outer join on 'izena'
df_joined = df_fisica.join(df_mates, on="izena", how="full_outer") \
                     .join(df_ingles, on="izena", how="full_outer")

In [54]:
# Aldatu null 0-rekin
df_joined = df_joined.fillna("0")


In [55]:
# 3 notak dituen DataFramean noten mediekin zutabe bat gehitu.
df_joined = df_joined.withColumn("media", round((col("notas_fisica").cast("float") + col("notas_mates").cast("float") + col("notas_ingles").cast("float")) / 3,2 ))


In [56]:
# Show the joined DataFrame
print("Dataframe guztira:")
df_joined.show()

Joined DataFrame:
+---------+------------+-----------+------------+-----+
|    izena|notas_fisica|notas_mates|notas_ingles|media|
+---------+------------+-----------+------------+-----+
|Alejandro|           3|          5|           7|  5.0|
|   Anabel|           2|          8|           7| 5.67|
|   Andres|           4|          4|           6| 4.67|
|   Carlos|           4|          4|           8| 5.33|
| Fernando|           9|          5|           7|  7.0|
|   Isabel|           8|          8|           7| 7.67|
|    Jorge|           5|         10|           5| 6.67|
|Jose Juan|           3|          5|           3| 3.67|
| Leonardo|           6|          1|           4| 3.67|
|    Maria|           3|          2|           6| 3.67|
|  Nicolas|           7|          2|           5| 4.67|
|    Oscar|           5|          7|           3|  5.0|
|    Pedro|           2|          5|           0| 2.33|
|    Ramon|           7|        4.5|           8|  6.5|
|    Rocio|           5|      

In [57]:
# Stop the Spark session
spark.stop()