In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType, DateType, LongType, DoubleType

import pyspark
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, when, col
import pandas as pd
from pyspark.sql import Window

from pyspark.ml.feature import Bucketizer
from pyspark import StorageLevel


import os


In [4]:
spark = (
    SparkSession
    .builder 
    .appName("Preprocessing SDM") 
    .config("spark.driver.memory", "2g") 
    .getOrCreate()
)

your 131072x1 screen size is bogus. expect trouble
24/04/10 21:22:47 WARN Utils: Your hostname, MyLaptop resolves to a loopback address: 127.0.1.1; using 172.28.44.164 instead (on interface eth0)
24/04/10 21:22:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/10 21:22:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
root_directory =  "/mnt/c/MDS/Q2/SDM/data/data"

### Read data

#### Article

In [6]:
def _create_schema(headers):
    schema = []
    for columna, tipo in headers:
        if tipo in ['int', 'ID']: spark_dtype = LongType()
        elif str(tipo).startswith("string"): spark_dtype = StringType()
        else: spark_dtype = StringType()

        if columna == '': columna = tipo
    
        schema.append(StructField(columna, spark_dtype, True))
    return StructType(schema)


def read_data_w_sep_headers(name_data: str = "dblp_www"):
    headers = pd.read_csv(f"{root_directory}/{name_data}_header.csv", delimiter=";").columns
    headers = map(lambda x: str(x).split(":"), headers)
    
    schema = _create_schema(headers)

    df = spark.read.schema(schema).options(delimiter=";").csv(f"{root_directory}/{name_data}.csv")
    
    return df

def read_data_w_embedded_headers(name_data: str = "dblp_www"):
    headers = pd.read_csv(f"{root_directory}/{name_data}.csv", delimiter=";",nrows=1).columns
    headers = map(lambda x: str(x).split(":"), headers)
    
    schema = _create_schema(headers)

    df = spark.read.schema(schema).options(delimiter=";", header="true").csv(f"{root_directory}/{name_data}.csv")

    return df


def create_fake_email(df: pyspark.sql.DataFrame):
    df = (
        df
        .withColumn("cleaned_name", F.regexp_replace(F.lower("name"), "[^a-zA-Z0-9\s]", ""))
        .withColumn("email", F.concat(F.regexp_replace("cleaned_name", " ", "."), lit("@gmail.com")))
    )

    return df


    

In [7]:
article = read_data_w_sep_headers(name_data="dblp_article")
book = read_data_w_sep_headers(name_data="dblp_book")
data = read_data_w_sep_headers(name_data="dblp_data")
incollection = read_data_w_sep_headers(name_data="dblp_incollection")
inproceedings = read_data_w_sep_headers(name_data="dblp_inproceedings")
mastersthesis = read_data_w_sep_headers(name_data="dblp_mastersthesis")
phdthesis = read_data_w_sep_headers(name_data="dblp_phdthesis")
proceedings = read_data_w_sep_headers(name_data="dblp_proceedings")
www = read_data_w_sep_headers(name_data="dblp_www")


In [8]:
prop_data = 5_000/article.count()

                                                                                

In [9]:
article = article.filter(F.rand() < prop_data)

In [10]:
# article.write.mode("overwrite").parquet("checkpoint3")
article = spark.read.parquet("checkpoint3")

In [11]:
author = read_data_w_embedded_headers(name_data="dblp_author")
authored_by = read_data_w_embedded_headers(name_data="dblp_author_authored_by")
journal = read_data_w_embedded_headers(name_data="dblp_journal")
journal_published_in = read_data_w_embedded_headers(name_data="dblp_journal_published_in")


In [12]:
tables = [article, book, data, incollection, inproceedings, mastersthesis, phdthesis, proceedings, www]
name_tables = ["article", "book", "data", "incollection", "inproceedings", "mastersthesis", "phdthesis", "proceedings", "www"]

#### Creation of authors data

In [13]:
article_authors_relation = article.select(F.explode(F.split("author", "\|")).alias("author_name"), col("article").alias("article_id"))

In [14]:
article_authors_relation = article_authors_relation.withColumn("author_id", F.monotonically_increasing_id())

In [15]:

authors = article_authors_relation.select(col("author_name").alias("name"),"author_id")
authors_data = create_fake_email(df=authors)

#### Affiliation data

In [16]:
schools = phdthesis.select("school").dropDuplicates()
schools = schools.withColumn("number", F.floor(F.rand()*20)).dropDuplicates(['number'])


In [17]:
affiliation_data = authors_data.select("author_id", F.floor(F.rand()*20).alias("number")).join(schools, on="number", how="left").drop("number")

#### Creation of article data

In [18]:
article_data = article.select(
    col("article").alias("id"),
    col("author").alias("author_name"),
    "journal",
    "title", 
    "url",
    "volume",
    "year"
)

#### Create citations

In [19]:
citations = article_data.select("id","year").dropDuplicates()

citationsA = citations.withColumn("number",  F.round(F.exp(F.rand()),3)).persist(StorageLevel.DISK_ONLY)
citationsB = citations.select(col("id").alias("id_cited"), col("year").alias("year_cited")).withColumn("number",  F.round(F.exp(F.rand()),3)).persist(StorageLevel.DISK_ONLY)


In [20]:
cond = (citationsB['year_cited'] >= citationsA['year']) & (citationsB['number'] == citationsA['number'])

In [21]:
citations_data = citationsA.join(citationsB,on=cond).drop("number")

#### Creation of journal data

In [22]:
journal_data = article.select(
    "journal"
).dropDuplicates()

In [23]:
w = Window().orderBy("n_articles")
journal_rank = (
    article
    .groupBy("journal")
    .agg(F.countDistinct("article").alias("n_articles"))
    .orderBy(col("n_articles").desc())
    .limit(200)
    .withColumn("rank", F.row_number().over(w))
    .select("rank", "journal")
)

In [24]:
journal_data_w_rank = journal_data.join(journal_rank, on="journal", how="left")

#### Creation of time data

In [25]:
time = article.select("year").dropDuplicates()

### Creation of conferences information

In [26]:
proceedings_ = (
    proceedings
    .withColumn("type", F.split("url", "\/")[1])
    .withColumn("conference_name", F.split("url", "\/")[2])
    .withColumn("edition", F.concat_ws("-","conference_name", "year"))
    .filter(col("type") == 'conf')
    .select("type","conference_name","edition", "editor", "year")
)

In [27]:
inproceedings_ = (
    inproceedings
    .withColumn("type", F.split("url", "\/")[1])
    .withColumn("conference_name", F.split("url", "\/")[2])
    .withColumn("edition", F.concat_ws("-","conference_name", "year"))
    .filter(col("type") == 'conf')
    .select("type","conference_name","edition", "editor", "year")
)

In [28]:
conference_information = proceedings_.union(inproceedings_)

In [29]:
conference_information.write.mode("overwrite").parquet("checkpoint")
conference_information = spark.read.parquet("checkpoint")



                                                                                

In [30]:
cities = [ 
    "New York City, USA", "London, UK", "Tokyo, Japan", "Paris, France", "Los Angeles, USA", "Beijing, China", "Moscow, Russia", "Istanbul, Turkey", "Sao Paulo, Brazil",
    "Cairo, Egypt", "Mumbai, India", "Mexico City, Mexico", "Seoul, South Korea", "Jakarta, Indonesia", "Karachi, Pakistan", "Buenos Aires, Argentina", "Delhi, India", 
    "Shanghai, China", "Manila, Philippines", "Dhaka, Bangladesh", "Moscow, Russia", "Istanbul, Turkey", "Tianjin, China", "Rio de Janeiro, Brazil", "Lagos, Nigeria", "Lima, Peru", 
    "Bangkok, Thailand", "Jakarta, Indonesia", "Cairo, Egypt", "Bogota, Colombia", "Kinshasa, Democratic Republic of the Congo", "Seoul, South Korea", "Dhaka, Bangladesh", "Karachi, Pakistan", 
    "Tokyo, Japan", "Manila, Philippines", "Guangzhou, China", "Mumbai, India", "Istanbul, Turkey", "Moscow, Russia", "Sao Paulo, Brazil", "Beijing, China", "Lahore, Pakistan", 
    "Shenzhen, China", "Chongqing, China", "Chengdu, China", "Lahore, Pakistan", "Kinshasa, Democratic Republic of the Congo", "Bangalore, India", "Taipei, Taiwan"
]


In [31]:
w = Window.orderBy("city")
cities = spark.createDataFrame([(city,) for city in cities], ["city"]).withColumn("number", F.row_number().over(w))

In [32]:
conference_information_w_random_city = (
    conference_information
    .withColumn("number", F.floor(F.rand()*50))
    .join(F.broadcast(cities), on="number", how="left")
    .dropDuplicates(['edition'])
)

#### Create random link between article data and conference edition

In [33]:
conf_info_random = conference_information_w_random_city.select("edition","year")

In [34]:
conf_info_random.write.mode("overwrite").parquet("checkpoint2")
conf_info_random = spark.read.parquet("checkpoint2")



                                                                                

In [35]:
w = Window.partitionBy("id").orderBy(F.rand())



article_data_w_conf_info = (
    article_data
    .join(F.broadcast(conf_info_random), on=["year"], how="left")
    .withColumn("row", F.row_number().over(w))
    .filter(col("row")==1).drop("row")
)

In [36]:
conference_information_w_random_city = conference_information_w_random_city.join(article_data_w_conf_info.select("edition"), on="edition", how="inner")

#### Create conference data

In [37]:
conference_data = conference_information_w_random_city.select("conference_name").dropDuplicates()

#### Create edition data

In [38]:
edition_data = conference_information_w_random_city.select("conference_name", col("edition").alias("conference_edition"), "city", "year")

#### Create city

In [39]:
city_data = cities.drop("number")

#### Create random link between reviewers and articles

In [40]:
num = authors_data.count()

In [41]:
w = Window.orderBy(F.rand())

authors_rand = authors_data.withColumn("number", F.row_number().over(w)).select("number","author_id").dropDuplicates(['number'])

In [42]:
relation1 = (
    article_data
    .select("id")
    .withColumn("number", F.floor(F.rand()*num))
    .join(authors_rand, on="number", how="left")
    .withColumn("approved", when(F.rand()*4 > .5, "Yes").otherwise("No"))
).persist(StorageLevel.DISK_ONLY)
relation2 = (
    article_data
    .select("id")
    .withColumn("number", F.floor(F.rand()*num))
    .join(authors_rand, on="number", how="left")
    .withColumn("approved", when(F.rand()*4 > .5, "Yes").otherwise("No"))
).persist(StorageLevel.DISK_ONLY)

relation3 = (
    article_data
    .select("id")
    .withColumn("number", F.floor(F.rand()*num))
    .join(authors_rand, on="number", how="left")
    .withColumn("approved", when(F.rand()*4 > .5, "Yes").otherwise("No"))
).persist(StorageLevel.DISK_ONLY)


relation_author_reviewer = relation1.union(relation2).union(relation3).drop("number")

24/04/10 21:23:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/10 21:23:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/10 21:23:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/10 21:23:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/10 21:23:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/10 21:23:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


#### Communities data

In [43]:
communities = ['Big Data', "AI", "Data Sciene", "Hardware", "Computer Systems", "Database", "Web"]

In [44]:
# Sample data
categories = ['Big Data', 'AI', 'Data Science', 'Hardware', 'Computer Systems', 'Web']
num_categories = len(categories)


# Generate random numbers
df_with_random = article_data.select("id").withColumn("random", F.rand())

# Define buckets for discretization
splits = [float(i) / num_categories for i in range(num_categories + 1)]

# Define bucketizer
bucketizer = Bucketizer(splits=splits, inputCol="random", outputCol="categoryIndex")

# Apply bucketizer to assign category indices
df_with_category_index = bucketizer.transform(df_with_random)

# Assign category names based on category indices
df_with_category = df_with_category_index.withColumn(
    "category", 
    when(F.rand() <.9, "database")
    .when(col("categoryIndex") == 0, categories[0]) \
    .when(col("categoryIndex") == 1, categories[1]) \
    .when(col("categoryIndex") == 2, categories[2]) \
    .when(col("categoryIndex") == 3, categories[3]) \
    .when(col("categoryIndex") == 4, categories[4]) \
    .otherwise(categories[5])
    )

In [45]:
communities_data = df_with_category.select("id","category")

### Persist to disk

In [46]:
authors_data.write.mode("overwrite").parquet(f"../temporal_zone/authors_data")
article_data_w_conf_info.write.mode("overwrite").parquet(f"../temporal_zone/article_data")
article_authors_relation.write.mode("overwrite").parquet(f"../temporal_zone/article_authors_relation")
journal_data_w_rank.write.mode("overwrite").parquet(f"../temporal_zone/journal_data_w_rank")
time.write.mode("overwrite").parquet(f"../temporal_zone/time")
conference_data.write.mode("overwrite").parquet(f"../temporal_zone/conference_data")
edition_data.write.mode("overwrite").parquet(f"../temporal_zone/edition_data")
city_data.write.mode("overwrite").parquet(f"../temporal_zone/city_data")
relation_author_reviewer.write.mode("overwrite").parquet(f"../temporal_zone/reviews_data")
citations_data.write.mode("overwrite").parquet(f"../temporal_zone/citations_data")
communities_data.write.mode("overwrite").parquet(f"../temporal_zone/communities_data")
affiliation_data.write.mode("overwrite").parquet(f"../temporal_zone/affiliation_data")

24/04/10 21:23:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/10 21:23:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/10 21:23:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/10 21:23:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/10 21:23:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/10 21:23:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/10 2

In [47]:

def delete_quotes(df):
    # List to store expressions for select
    exprs = []
    
    # Iterate through all columns
    for column in df.columns:
        # Check if the column is of StringType
        if df.schema[column].dataType == StringType():
            # Replace quotes with empty string in the column
            exprs.append(F.regexp_replace(col(column), '"', '').alias(column))
        else:
            exprs.append(col(column))
    
    # Select the modified columns
    return df.select(*exprs)

In [48]:
import shutil
import os

# Define a list of file paths
file_paths = [
    "../temporal_zone/authors_data",
    "../temporal_zone/article_data",
    "../temporal_zone/article_authors_relation",
    "../temporal_zone/journal_data_w_rank",
    "../temporal_zone/time",
    "../temporal_zone/conference_data",
    "../temporal_zone/edition_data",
    "../temporal_zone/city_data",
    "../temporal_zone/reviews_data",
    "../temporal_zone/citations_data",
    "../temporal_zone/communities_data",
    "../temporal_zone/affiliation_data"

]

# Loop through each file path
for file_path in file_paths:
    # Write the DataFrame to CSV
    spark.read.parquet(file_path).transform(delete_quotes).coalesce(1).write.mode("overwrite").options(header="true").csv(f"{file_path}_tmp")

    # Get the path of the single CSV file
    csv_file_path = f"{file_path}_tmp/*.csv"
    files = os.listdir(f"{file_path}_tmp")
    for file in files:
        if file.endswith(".csv"):
            csv_file_path = os.path.join(f"{file_path}_tmp", file)
            break

    # Rename the CSV file
    os.rename(csv_file_path, f"{file_path}.csv")

    # Delete the temporary folder
    shutil.rmtree(f"{file_path}_tmp")

                                                                                

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 36672)
Traceback (most recent call last):
  File "/root/miniconda3/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/root/miniconda3/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/root/miniconda3/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/root/miniconda3/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/root/miniconda3/lib/python3.11/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/root/miniconda3/lib/python3.11/site-packages/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "/root/miniconda3/lib/pytho