In [2]:
# Importing necessary libraries for Parquet
import pyarrow.csv as csv
import pyarrow.parquet as pq
import glob
import os
import pyarrow as pa
from pyarrow import parquet
import polars as pl
from datetime import datetime

In [35]:
# Providing folder path
folder_path = "/Users/ben/Documents/GitHub/CloudComputing/RedditPlaceData/"

# Partquet file name
parquet_file = "combined_place_data.parquet"

# Getting csv_file names
csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)

# Create Schema
schema = pa.schema([
    pa.field("timestamp", pa.string()),
    pa.field("user", pa.string()),
    pa.field("coordinate", pa.string()),
    pa.field("pixel_color", pa.string())
])

# Initializing writer
writer = parquet.ParquetWriter(parquet_file, schema, compression='snappy')

# Iterating through csv files
for file in sorted(csv_files):
    
    print("reading", file)
    table = csv.read_csv(file)
    # Casting table to schema
    table = table.cast(schema)

    print("writing", file)
    writer.write_table(table)
    
if writer:
    writer.close()
    
print(f"Combined CSVs to {parquet_file}")

reading /Users/ben/Documents/GitHub/CloudComputing/RedditPlaceData/2023_place_canvas_history-000000000000.csv
writing /Users/ben/Documents/GitHub/CloudComputing/RedditPlaceData/2023_place_canvas_history-000000000000.csv
reading /Users/ben/Documents/GitHub/CloudComputing/RedditPlaceData/2023_place_canvas_history-000000000001.csv
writing /Users/ben/Documents/GitHub/CloudComputing/RedditPlaceData/2023_place_canvas_history-000000000001.csv
reading /Users/ben/Documents/GitHub/CloudComputing/RedditPlaceData/2023_place_canvas_history-000000000002.csv
writing /Users/ben/Documents/GitHub/CloudComputing/RedditPlaceData/2023_place_canvas_history-000000000002.csv
reading /Users/ben/Documents/GitHub/CloudComputing/RedditPlaceData/2023_place_canvas_history-000000000003.csv
writing /Users/ben/Documents/GitHub/CloudComputing/RedditPlaceData/2023_place_canvas_history-000000000003.csv
reading /Users/ben/Documents/GitHub/CloudComputing/RedditPlaceData/2023_place_canvas_history-000000000004.csv
writing /U

In [2]:
test_file = pq.ParquetFile("/Users/ben/Documents/GitHub/CloudComputing/combined_place_data.parquet")

In [3]:
print(f"Number of entries (rows) in the Parquet file: {test_file.metadata.num_rows:,d}")

Number of entries (rows) in the Parquet file: 129,788,513


In [3]:
def parse_coordinate(coordinate):
    if len(coordinate) < 20:
        return coordinate
    else:
        coordinate = coordinate.replace(" ", "").replace(":", ",").split(",")
        x = coordinate[1]
        y = coordinate[3]
        return f"{x}" + "," + f"{y}"

In [4]:
print("Read parquet and cast datetime")
# Read the Parquet file
df = pl.scan_parquet('combined_place_data.parquet')
df = df.with_columns(pl.col("timestamp").str.strptime(pl.Datetime, '%Y-%m-%d %H:%M:%S%.f %Z').cast(pl.Datetime, strict=False))
df = df.with_columns(
    pl.col('coordinate').map_elements(parse_coordinate, return_dtype=pl.Object)
)
df = df.with_columns([
    pl.col('coordinate').map_elements(lambda x: int(x.split(",")[0]), return_dtype=pl.Int32).alias('x'),
    pl.col('coordinate').map_elements(lambda x: int(x.split(",")[1]), return_dtype=pl.Int32).alias('y')
])

Read parquet and cast datetime


In [32]:
df.head(10).collect(streaming=True)

timestamp,user,coordinate,pixel_color,x,y
datetime[μs],str,object,str,i32,i32
2023-07-20 13:00:26.088,"""no+8HEIDjbdx7/…","-199,-235","""#FFFFFF""",-199,-235
2023-07-20 13:00:43.658,"""qJ7O6cuUNfkDyn…","0,-298","""#FF4500""",0,-298
2023-07-20 13:00:43.705,"""uqi5XwkBePwcPK…","-42,-218","""#FFFFFF""",-42,-218
2023-07-20 13:01:02.487,"""rgSTj7FHZUHsLX…","-418,-232","""#B44AC0""",-418,-232
2023-07-20 13:01:40.445,"""2bmivBNj8NYvnp…",182164,"""#FF4500""",182,164
2023-07-20 13:01:51.457,"""iyPavVpo8ojDYs…","-113,-1","""#FFFFFF""",-113,-1
2023-07-20 13:01:52.149,"""a6Q+OsCSRDcPxh…","-64,-34","""#3690EA""",-64,-34
2023-07-20 13:01:57.333,"""AS0KN9rxoynWuN…","-267,-142","""#FFFFFF""",-267,-142
2023-07-20 13:02:14.260,"""aWwqNqt6Ydlvny…",-4352,"""#00A368""",-43,52
2023-07-20 13:03:42.173,"""vFCy3asEWbBER9…",-4374,"""#000000""",-43,74


In [8]:
value_counts = df.select("user").group_by("user").agg(pl.col('user').count().alias('count')).sort('count', descending=True)
value_counts = value_counts.with_columns(pl.col("count").cast(int))

In [14]:
value_counts.select("count").quantile(quantile=.90).collect(streaming = True)

count
f64
37.0


In [15]:
pbs_idx = value_counts.filter(pl.col("count") >= 37)

In [16]:
pixels_placed = df.filter(pl.col('user').is_in(pbs_idx.select("user").collect(streaming=True)))

In [17]:
pixels_placed.collect(streaming=True)

timestamp,user,coordinate,pixel_color
datetime[μs],str,str,str
2023-07-20 13:00:43.705,"""uqi5XwkBePwcPK…","""-42,-218""","""#FFFFFF"""
2023-07-20 13:01:40.445,"""2bmivBNj8NYvnp…","""182,164""","""#FF4500"""
2023-07-20 13:01:51.457,"""iyPavVpo8ojDYs…","""-113,-1""","""#FFFFFF"""
2023-07-20 13:01:52.149,"""a6Q+OsCSRDcPxh…","""-64,-34""","""#3690EA"""
2023-07-20 13:03:42.173,"""vFCy3asEWbBER9…","""-43,74""","""#000000"""
2023-07-20 13:04:03.685,"""rkTj/gktE+fk1u…","""-279,242""","""#000000"""
2023-07-20 13:04:17.971,"""sxYEVPfaohJYIY…","""60,-65""","""#000000"""
2023-07-20 13:04:38.197,"""ywG7MbcHONF6gs…","""-32,263""","""#B44AC0"""
2023-07-20 13:04:39.757,"""FdP9nI9ZB1x8f0…","""-62,280""","""#000000"""
2023-07-20 13:04:41.115,"""Sp4flue6oel2cw…","""-143,266""","""#000000"""


In [14]:
df.head(100).group_by("user").len().collect(streaming=True)

user,len
str,u32
"""8TrEI3PjtTA6rw…",1
"""HcrRBg4NZ+z333…",1
"""NrIs+gX+RYLnGS…",1
"""tIVX6MIPYuSnNN…",1
"""qJ7O6cuUNfkDyn…",1
"""VwROKxPV4+9TJE…",1
"""KF9Im+7IBADey5…",1
"""Eo/c2yPDrI95KW…",1
"""Iit2cSmPiHNisr…",1
"""GYS1Xvq9COtA8R…",1


In [15]:
df.head(10).group_by("user").agg(place_count = pl.col("coordinate").len()).collect(streaming=True)

user,place_count
str,u32
"""AS0KN9rxoynWuN…",1
"""iyPavVpo8ojDYs…",1
"""vFCy3asEWbBER9…",1
"""qJ7O6cuUNfkDyn…",1
"""aWwqNqt6Ydlvny…",1
"""uqi5XwkBePwcPK…",1
"""2bmivBNj8NYvnp…",1
"""a6Q+OsCSRDcPxh…",1
"""no+8HEIDjbdx7/…",1
"""rgSTj7FHZUHsLX…",1


In [18]:
# Aggregate df to count unique pixels and colors per user
agg_df = df.group_by('user').agg(
    color_count=pl.col('pixel_color').len(),
    place_count=pl.col('coordinate').len()
)

# Filter to find users with only one color or one place
filtered_df = agg_df.filter(
    (pl.col('color_count') == 1) | (pl.col('place_count') == 1)
)

In [19]:
filtered_df.collect(streaming=True)

user,color_count,place_count
str,u32,u32
"""e/ufMrFnYyFGIo…",1,1
"""nrdtwPkNUh/XYJ…",1,1
"""RKqipoOACpTqmD…",1,1
"""fkXDn38GzdVINe…",1,1
"""99TGCB4F+PWlXV…",1,1
"""wta65eagkJoi9d…",1,1
"""S6/kncQ7ZF174O…",1,1
"""W9Kl90kz+9Vi/l…",1,1
"""7IyI3TCTpfyA4g…",1,1
"""ILyryn/F6bnOkU…",1,1
