In [1]:
spark

## Processing `Votes.xml`

In [1]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, LongType
from datetime import datetime

In [2]:
# Dataset path 
dataset_bucket = 's3://stackoverflow-dataset-2023/dataset/raw/2023/'
dataset_file = f"{dataset_bucket}/Votes.xml"

In [3]:
rdd = spark.sparkContext.textFile(dataset_file)


In [4]:
def row_parser(row):
    
    fields = [
                "Id=",
                "PostId=",
                "VoteTypeId=",
                "UserId=",
                "TagName=",
                "Count=",
                "CreationDate=",
                "ExcerptPostId=",
                "WikiPostId=",
                "BountyAmount=", 
            ]
    
    row_field = dict.fromkeys(fields, None)
    row_list = [ i.strip() for i in row.split('"')[:-1] ]
    
    for i in range(0, len(row_list), 2):
        if row_list[i] == 'CreationDate=':
            row_field[row_list[i]] = datetime.strptime(row_list[i+1], "%Y-%m-%dT%H:%M:%S.%f")
        else:
            row_field[row_list[i]] = row_list[i+1]
        
    
    return tuple(row_field.values())

In [5]:
parsed_rdd = rdd.map(lambda row: row.strip()) \
   .filter(lambda row: row.startswith("<row")) \
   .map(lambda row: row[4:-3]) \
   .map(lambda row: row.strip()) \
   .map(row_parser)

In [6]:
parsed_rdd.count()

                                                                                

228077281

In [5]:
parsed_rdd.count()

                                                                                

224535501

In [7]:
# Define the schema for the DataFrame
schema_votes = StructType([
    StructField("Id", StringType()),
    StructField("PostId", StringType()),
    StructField("VoteTypeId", StringType()),
    StructField("UserId", StringType()),
    StructField("TagName", StringType()),
    StructField("Count", StringType()),
    StructField("CreationDate", TimestampType()),
    StructField("ExcerptPostId", StringType()),
    StructField("WikiPostId", StringType()),
    StructField("BountyAmount", StringType()),
])

# Convert the RDD to a DataFrame
df = parsed_rdd.toDF(schema_votes)

In [8]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- PostId: string (nullable = true)
 |-- VoteTypeId: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- TagName: string (nullable = true)
 |-- Count: string (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- ExcerptPostId: string (nullable = true)
 |-- WikiPostId: string (nullable = true)
 |-- BountyAmount: string (nullable = true)



In [9]:
df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+---+------+----------+------+-------+-----+-------------------+-------------+----------+------------+
| Id|PostId|VoteTypeId|UserId|TagName|Count|       CreationDate|ExcerptPostId|WikiPostId|BountyAmount|
+---+------+----------+------+-------+-----+-------------------+-------------+----------+------------+
|  1|     1|         2|  null|   null| null|2008-07-31 00:00:00|         null|      null|        null|
|  2|     3|         2|  null|   null| null|2008-07-31 00:00:00|         null|      null|        null|
|  3|     2|         2|  null|   null| null|2008-07-31 00:00:00|         null|      null|        null|
|  4|     4|         2|  null|   null| null|2008-07-31 00:00:00|         null|      null|        null|
|  5|     6|         2|  null|   null| null|2008-07-31 00:00:00|         null|      null|        null|
|  6|     7|         2|  null|   null| null|2008-07-31 00:00:00|         null|      null|        null|
|  7|    13|         2|  null|   null| null|2008-08-01 00:00:00|         

                                                                                

In [12]:
df = df \
    .withColumn('Id', F.col('Id').cast('int')) \
    .withColumn('PostId', F.col('PostId').cast('int')) \
    .withColumn('VoteTypeId', F.col('VoteTypeId').cast('int')) \
    .withColumn('Count', F.col('Count').cast('int')) \
    .withColumn('ExcerptPostId', F.col('ExcerptPostId').cast('int')) \
    .withColumn('WikiPostId', F.col('WikiPostId').cast('int')) \
    .withColumn('BountyAmount', F.col('BountyAmount').cast('int')) 


df.count()

                                                                                

228077281

In [13]:
df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- PostId: integer (nullable = true)
 |-- VoteTypeId: integer (nullable = true)
 |-- UserId: string (nullable = true)
 |-- TagName: string (nullable = true)
 |-- Count: integer (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- ExcerptPostId: integer (nullable = true)
 |-- WikiPostId: integer (nullable = true)
 |-- BountyAmount: integer (nullable = true)



In [None]:
df.count()

In [14]:
# Dataset path 
output_bucket = 's3://stackoverflow-dataset-2023/dataset/raw-processed/2023'
output_folder_name = f"{output_bucket}/Votes-parquet"

# save dataframe as csv
df.write \
  .format('parquet') \
  .option('header', True) \
  .mode('overwrite') \
  .save(output_folder_name)

                                                                                