## Import

In [1]:
import os
import pyarrow.parquet as pq
import pyarrow as pa

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

## Config

### Analysis Condition

In [2]:
MASTER = 'local[10]'
CONFIG_DICT = {
    'spark.driver.memory': "8g",
    'spark.executor.memory': "8g",
    'spark.checkpoint.compress': True,
}

### Data Path

In [3]:
DATA_PATH = '/home/nika/Erasmus/Spain/courses/big data/Project/data'
TEMP_DIR = 'fixed_data_dir'

In [4]:
DEBUGGING_MODE = False
TRUNCATE = True
def df_summary(df, df_name=None, debugging_mode=DEBUGGING_MODE, truncate=TRUNCATE):
    if df_name is not None:
        print(f"DataFrame {df_name}")
    df.printSchema()
    if debugging_mode:
        print("Number of rows in df:", df.count())
        print("Sample rows:")
        df.show(5, truncate=truncate)

## Spark

In [5]:
def get_spark_session(app_name,
                      master=MASTER,
                      spark_config=CONFIG_DICT,):
    spark_builder = SparkSession.builder.master(master).appName(app_name)

    for key, val in spark_config.items():
        spark_builder.config(key, val)

    spark_session = spark_builder.getOrCreate()
    sc = spark_session.sparkContext
    return spark_session

In [6]:
spark = get_spark_session("Web Topic Drifting")
print("Spark UI address:", spark.sparkContext.uiWebUrl)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/29 08:54:08 WARN Utils: Your hostname, nikast, resolves to a loopback address: 127.0.1.1; using 10.192.93.209 instead (on interface wlp0s20f3)
26/01/29 08:54:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/29 08:54:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark UI address: http://10-192-93-209client.eduroam.upc.edu:4040


# Convert Data

In [8]:
def fix_large_parquet_dir_chunked(input_dir, output_dir, batch_size=500000):
    dataset = pq.ParquetDataset(input_dir)

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    file_index = 0

    for fragment in dataset.fragments:
        for batch in fragment.to_batches(batch_size=batch_size):

            table = pa.Table.from_batches([batch])

            # Cast float16 -> float32
            new_schema = pa.schema([
                pa.field(f.name, pa.float32() if f.type == pa.float16() else f.type)
                for f in table.schema
            ])
            casted_table = table.cast(new_schema)

            # Write each batch into its own parquet file
            output_file = os.path.join(output_dir, f"fixed_part_{file_index:05d}.parquet")
            pq.write_table(casted_table, output_file)

            file_index += 1

    print(f"Done! Saved {file_index} chunk files in: {output_dir}")

fix_large_parquet_dir_chunked(DATA_PATH, TEMP_DIR)
web_df = spark.read.parquet(TEMP_DIR)

Done! Saved 38 chunk files in: fixed_data_dir


In [9]:
web_df.printSchema()

root
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- reddit_scores: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- score: float (nullable = true)



In [12]:
web_df.show(10, truncate=False)

+----------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [13]:
web_df.show(10)

+--------------------+--------------------+-------------+-----+
|               title|                text|reddit_scores|score|
+--------------------+--------------------+-------------+-----+
|FBI sought record...|The federal agent...|  [25, 31, 2]|  4.0|
|IQ, Skepticism, a...|IQ, Skepticism, a...|   [24, 2, 1]|  5.0|
|Trump can't tell ...|The king sits in ...|          [3]|  4.0|
|At least 19 wound...|Heavy Indian shel...|         [10]|  5.0|
|ABC changes 'unne...|Three bills befor...|        [116]|  5.0|
|Video: Art Studen...|﻿\n\nEight border...|       [9, 4]|  4.0|
|The junhu and the...|It's easy to make...|         [25]|  5.0|
|How Russia has hi...|How Russia has hi...| [1, 1, 1, 1]|  5.0|
|Warranty void sti...|Warranty void sti...|          [3]|  5.0|
|Kenedy, Martin Du...|Martin Dubravka, ...|         [63]|  5.0|
+--------------------+--------------------+-------------+-----+
only showing top 10 rows
