# Lab5

## Zadanie 2
Cel zadania: usuwanie zduplikowanych kolumn 
Użyj Generatora i stwórz dwie duże tabele po 1 milion wierszy i wykonaj dwa typy joinów, inner oraz left.  
Połącz tabele po tych samych kolumnach i użyj jednej metody z wykładów na usunięcie duplikatów.  

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType
import time
import random
from datetime import datetime, timedelta

In [0]:
# Spark session
spark = SparkSession.builder.appName("DuplicateColumnsRemoval").getOrCreate()

In [0]:
def generate_dataframe1(rows, partitions=8):
    """First dataframe"""
    schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("value_a", DoubleType(), True),
        StructField("value_b", DoubleType(), True),
        StructField("category", StringType(), True),
        StructField("date", TimestampType(), True)
    ])
    
    # Create an RDD 
    rdd = spark.sparkContext.parallelize(range(rows), partitions)
    
    # Transform to dataframe rows
    def create_row(idx):
        return (
            idx,  # id
            random.random(),  # value_a
            random.random(),  # value_b
            random.choice(['A', 'B', 'C', 'D']),  # category
            datetime(2025, 1, 1) + timedelta(minutes=idx % 1440)  # date
        )
    
    df = rdd.map(create_row).toDF(schema)
    return df

total_rows = 1000000  
df1 = generate_dataframe1(total_rows)

print("Dataframe 1 schema:")
df1.printSchema()
print("Dataframe 1 sample:")
display(df1.head(5))

Dataframe 1 schema:
root
 |-- id: integer (nullable = false)
 |-- value_a: double (nullable = true)
 |-- value_b: double (nullable = true)
 |-- category: string (nullable = true)
 |-- date: timestamp (nullable = true)

Dataframe 1 sample:


id,value_a,value_b,category,date
0,0.8875702618912542,0.5771358000828202,D,2025-01-01T00:00:00.000+0000
1,0.7787943571019788,0.3690457872298036,A,2025-01-01T00:01:00.000+0000
2,0.075004238113118,0.550072870510849,C,2025-01-01T00:02:00.000+0000
3,0.3912165818413925,0.5978923445241089,C,2025-01-01T00:03:00.000+0000
4,0.1727151200118016,0.7363518014038383,A,2025-01-01T00:04:00.000+0000


In [0]:
def generate_dataframe2(rows, partitions=8, overlap_ratio=0.7):
    """Second df with overlap"""
    # Define schema
    schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("value_a", DoubleType(), True),  # Same column name as df1 (will be duplicated after join)
        StructField("value_c", DoubleType(), True),
        StructField("category", StringType(), True),  # Same column name as df1
        StructField("timestamp", TimestampType(), True)
    ])
    
    # Create an RDD 
    rdd = spark.sparkContext.parallelize(range(rows), partitions)
    
    # Transform to dataframe
    def create_row(idx):
        shifted_id = int(idx * overlap_ratio)  # This creates overlap with df1
        return (
            shifted_id,  # id with overlap
            random.random(),  # value_a
            random.random(),  # value_c
            random.choice(['A', 'B', 'C', 'D', 'E']),  # category
            datetime(2025, 1, 1) + timedelta(hours=idx % 8760)  # timestamp
        )
    
    df = rdd.map(create_row).toDF(schema)
    return df

df2 = generate_dataframe2(total_rows)
print("Dataframe 2 schema:")
df2.printSchema()
print("Dataframe 2 sample:")
display(df2.head(50))

Dataframe 2 schema:
root
 |-- id: integer (nullable = false)
 |-- value_a: double (nullable = true)
 |-- value_c: double (nullable = true)
 |-- category: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

Dataframe 2 sample:


id,value_a,value_c,category,timestamp
0,0.4334738851922334,0.7646395897437299,C,2025-01-01T00:00:00.000+0000
0,0.7198954296119107,0.5146007606611811,A,2025-01-01T01:00:00.000+0000
1,0.2180263314609322,0.1779436812534219,A,2025-01-01T02:00:00.000+0000
2,0.148317689026929,0.0414778507077362,E,2025-01-01T03:00:00.000+0000
2,0.6114363245160213,0.6174670345395866,C,2025-01-01T04:00:00.000+0000
3,0.8182952833125602,0.4200522508353166,E,2025-01-01T05:00:00.000+0000
4,0.2172854547319838,0.4189049397206389,B,2025-01-01T06:00:00.000+0000
4,0.7959007381578439,0.5048721133557013,D,2025-01-01T07:00:00.000+0000
5,0.7951612336380187,0.7486284024340703,A,2025-01-01T08:00:00.000+0000
6,0.2018089642072271,0.7233667508325967,C,2025-01-01T09:00:00.000+0000


### Inner join


In [0]:
# INNER JOIN
print("\nPerforming INNER JOIN...")
start_time = time.time()
inner_join_result = df1.join(df2, on="id", how="inner").drop(df2.value_a).drop(df2.category).drop(df2.timestamp).dropDuplicates(["id"]).select(["id", "category", "value_a", "value_b", "value_c", "date"]).sort("id")

display(inner_join_result.head(50))


Performing INNER JOIN...


id,category,value_a,value_b,value_c,date
0,C,0.0410502374087849,0.2231811148964605,0.4608789060871507,2025-01-01T00:00:00.000+0000
1,A,0.7273159464297311,0.5154317483583815,0.1843343016804625,2025-01-01T00:01:00.000+0000
2,C,0.7222758747513638,0.6388533301395048,0.7373423427813063,2025-01-01T00:02:00.000+0000
3,A,0.0572368423885696,0.0563693304078204,0.2784320638120869,2025-01-01T00:03:00.000+0000
4,C,0.3239774521792048,0.7330026494128156,0.9845326320810251,2025-01-01T00:04:00.000+0000
5,C,0.3915819829047006,0.3102805634951876,0.507791117975974,2025-01-01T00:05:00.000+0000
6,D,0.8913985981944376,0.8463527582922517,0.2506071037810851,2025-01-01T00:06:00.000+0000
7,D,0.969286936468024,0.0216551793457357,0.2408826436907987,2025-01-01T00:07:00.000+0000
8,D,0.8014046326641912,0.2537828612369761,0.0349669808346734,2025-01-01T00:08:00.000+0000
9,D,0.9883247948928108,0.377214010862293,0.5613484189900836,2025-01-01T00:09:00.000+0000


In [0]:
# LEFT JOIN

# INNER JOIN
print("\nPerforming INNER JOIN...")
start_time = time.time()
left_join_result = df1.join(df2, on="id", how="left").drop(df2.value_a).drop(df2.category).drop(df2.timestamp).dropDuplicates(["id"]).select(["id", "category", "value_a", "value_b", "value_c", "date"]).sort("id")

display(left_join_result.head(50))


Performing INNER JOIN...


id,category,value_a,value_b,value_c,date
0,C,0.1775817526999521,0.1818023210486566,0.7370999088251613,2025-01-01T00:00:00.000+0000
1,B,0.5313337127047917,0.933390190566002,0.6134935888030459,2025-01-01T00:01:00.000+0000
2,C,0.1763182262600212,0.6767126765397841,0.1005885187062346,2025-01-01T00:02:00.000+0000
3,C,0.1587853852770651,0.6336646285464592,0.2464832911743961,2025-01-01T00:03:00.000+0000
4,C,0.801927726252587,0.0454036066403441,0.761282553977124,2025-01-01T00:04:00.000+0000
5,C,0.6816752195049517,0.5928874753864654,0.4059188091535261,2025-01-01T00:05:00.000+0000
6,D,0.4963742787444728,0.4471657977874285,0.4889193765846521,2025-01-01T00:06:00.000+0000
7,C,0.968516802167712,0.7385233537297544,0.6198241791342156,2025-01-01T00:07:00.000+0000
8,B,0.3223250885335268,0.0517966648979726,0.502313559382153,2025-01-01T00:08:00.000+0000
9,B,0.4413833898405191,0.5610506158453856,0.8163029714387798,2025-01-01T00:09:00.000+0000
