In [3]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("ISSN_ISBN_Comparison") \
    .config("spark.executor.instances", "5") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.kubernetes.container.image", "<your-spark-image>") \
    .getOrCreate()

In [6]:
# Load the validation DataFrame (issn_isbn.csv)
issn_isbn_df = spark.read.option("header", "true").csv('/share/smartdata/citations/issn_isbn.csv', inferSchema=True)

# Load the input DataFrame (checkpoint_4_processed/*.csv)
input_files_df = spark.read.option("header", "true").csv('/share/smartdata/citations/checkpoint_4_processed/*.csv', inferSchema=True)

                                                                                

In [7]:
output_folder = "/share/smartdata/citations/checkpoint_spark_issn"

# Use a left semi join to keep only rows with "issn/isbn" that exist in the issn_isbn.csv
filtered_df = input_files_df.join(
    issn_isbn_df,
    input_files_df["issn/isbn"] == issn_isbn_df["Identifier"],
    "left_semi"
)

# Save the filtered DataFrame back to a directory (optional)
filtered_df.write.option("header", "true").csv(output_folder, mode='overwrite')
print("Operation successful")

                                                                                

Operation successful


In [10]:
# Get the first row of the DataFrame
first_row = df.first()

# Print each value from the first row in separate outputs
print("ID:", first_row['id'])
print("Title:", first_row['title'])
print("Author:", first_row['author'])
print("Venue:", first_row['venue'])
print("Publication Date:", first_row['pub_date'])
print("ISSN/ISBN:", first_row['issn/isbn'])

ID: omid:br/06604017286 pmid:968900
Title: Rga (Rodgers) And The HLA Region: Linkage And Associations.
Author: C M Giles [omid:ra/066010948469]
Venue: Tissue Antigens [omid:br/0614049880 issn:1399-0039 issn:0001-2815]
Publication Date: 1976
ISSN/ISBN: 0001-2815


### Ora che ho eliminato tutti i valori per cui i miei issn/isbn non combaciassero, quindi ho solo i venues che mi interessano, conto quante righe ci sono in totale

In [11]:
# Directory containing all CSV files
data_directory = "/share/smartdata/citations/checkpoint_spark_issn"

# Read all CSV files in the directory into a single DataFrame
all_data_df = spark.read.option("header", "true").csv(data_directory)

# Count the number of rows in the DataFrame
row_count = all_data_df.count()

# Print the result
print(f"The total number of rows across all datasets is: {row_count}")




The total number of rows across all datasets is: 123126818


                                                                                

### Eliminiamo i csv vuoti

In [15]:
from pyspark.sql import SparkSession
import os
import glob

# List all CSV files in the directory using glob
files = glob.glob(os.path.join(data_directory, '*.csv'))

# Check each file and delete if empty
empty_files = []
for file_path in files:
    df = spark.read.option("header", "true").csv(file_path)
    if df.count() == 0:  # Check if the DataFrame is empty
        empty_files.append(file_path)
        # Uncomment the next line to delete the file. Be cautious with deletion operations.
        #os.remove(file_path)
        print(f"Deleted empty file: {file_path}")
    #else:
        #print(f"Checked and retained: {file_path}")

# Optionally, print all identified empty files (for review before actual deletion)
print("Empty files identified (not deleted):", empty_files)


Empty files identified (not deleted): []


### Modifico il formato di pub_date rendendolo uniforme per tutti e lo salvo in UPDATE_DATE

In [2]:
from pyspark.sql.functions import regexp_extract, col

data_directory = "/share/smartdata/citations/checkpoint_spark_issn"
# Read all CSV files in the directory into a single DataFrame
df = spark.read.option("header", "true").csv(data_directory + "/*.csv")

# Extract the year from the 'pub_date' and overwrite the 'pub_date' column
df = df.withColumn("pub_date", regexp_extract(col("pub_date"), r"^(\d{4})", 1))

# Specify the output directory for the modified data
output_directory = "/share/smartdata/citations/update_date"
df.write.option("header", "true").csv(output_directory, mode="overwrite")


                                                                                

In [3]:
# Get distinct publication dates
unique_pub_dates = df.select("pub_date").distinct()

# Show the unique publication dates
unique_pub_dates.show(truncate=False)



+--------+
|pub_date|
+--------+
|1953    |
|1903    |
|1957    |
|1897    |
|1987    |
|1880    |
|1956    |
|1936    |
|2016    |
|2020    |
|2012    |
|1958    |
|1910    |
|1943    |
|1915    |
|1972    |
|1931    |
|1938    |
|1926    |
|1988    |
+--------+
only showing top 20 rows



                                                                                

In [4]:
from pyspark.sql.functions import col, year

# Assuming 'pub_date' is a valid date column; if not, adjust the parsing logic accordingly
df = df.withColumn("year", year(col("pub_date")))

# Aggregate publication counts by year
yearly_counts = df.groupBy("year").count().orderBy("year")
yearly_counts.show()
yearly_data = yearly_counts.collect()

                                                                                

+----+-----+
|year|count|
+----+-----+
|null| 2439|
|1512|    1|
|1799|   20|
|1800|   38|
|1801|   27|
|1802|   21|
|1803|   44|
|1804|   13|
|1805|   33|
|1806|   21|
|1807|   21|
|1808|   24|
|1809|   23|
|1810|   21|
|1811|   28|
|1812|   29|
|1813|   14|
|1814|   30|
|1815|   27|
|1816|   22|
+----+-----+
only showing top 20 rows



                                                                                

### Non ci sono file vuoti, adesso andiamo a creare i csv divisi per anno

In [6]:
import os

# Input directory containing all CSV files
input_directory = "/share/smartdata/citations/update_date"

# Output directory for CSV files organized by publication year
output_directory = "/share/smartdata/citations/update_year_csv"

# Create the output directory if it does not exist
os.makedirs(output_directory, exist_ok=True)

# Read all CSV files in the input directory into a single DataFrame
all_data_df = spark.read.option("header", "true").csv(input_directory)

# Assuming 'pub_date' contains the year and possibly more details, we need to extract just the year
# Here we add a new column 'year' that extracts the year from 'pub_date'
from pyspark.sql.functions import regexp_extract

all_data_df = all_data_df.withColumn("year", regexp_extract("pub_date", r"(\d{4})", 1))

# Now, write out the data, partitioned by the 'year' column
(all_data_df.write
    .partitionBy("year")
    .option("header", "true")
    .csv(output_directory, mode="overwrite"))



                                                                                

In [7]:
import os
import glob
import re

# Directory containing CSV files organized by year
output_directory = "/share/smartdata/citations/update_year_csv"

# Initialize counters
count = 0
count_valid_csv = 0

# Iterate over each folder in the directory
for folder in os.listdir(output_directory):
    if folder.startswith("year="):
        # Try to extract and convert the year part of the folder name
        try:
            year = int(folder.split('=')[1])  # Extract the year after 'year='
            if year > 2024:
                count += 1
            else:
                count_valid_csv += 1
        except ValueError:
            # Skip the folder if the year part is not an integer
            print(f"Skipping non-year folder: {folder}")

print(f"Number of directories with years more than 2024: {count}")
print(f"Number of directories with years less or equal than 2024: {count_valid_csv}")


Skipping non-year folder: year=__HIVE_DEFAULT_PARTITION__
Number of directories with years more than 2024: 1
Number of directories with years less or equal than 2024: 227


In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, regexp_extract
from pyspark.storagelevel import StorageLevel

base_path = "/share/smartdata/citations/update_year_csv"

# Read the data from CSV files
df = spark.read.option("header", "true").csv(base_path + "/*/part-*.csv", inferSchema=True)
df = df.withColumn("file_path", input_file_name())  # Add file path to each row
df = df.repartition(200)  # Repartition the DataFrame to manage memory more effectively
df = df.persist(StorageLevel.MEMORY_AND_DISK)  # Persist the DataFrame in memory and disk

# Extract the year from the file path and process data
df = df.withColumn("year", regexp_extract("file_path", "year=(\d+|__HIVE_DEFAULT_PARTITION__)", 1))
invalid_df = df.filter((df["year"] > "2024") | (df["year"] == "__HIVE_DEFAULT_PARTITION__"))
valid_df = df.filter(df["year"] <= "2024")

# Count the invalid and valid rows
invalid_rows = invalid_df.count()
valid_rows = valid_df.count()

# Output the counts
print("Invalid rows count:", invalid_rows)
print("Valid rows count:", valid_rows)




Invalid rows count: 2471
Valid rows count: 123124347


                                                                                

### Salvo i dataset validi in una cartella, quelli non validi in un'altra

In [4]:
import os
from pyspark.sql.functions import regexp_extract, col

input_directory = "/share/smartdata/citations/update_year_csv"

# Output directories for partitioned CSV files
output_directory_valid = "/share/smartdata/citations/UPDATED_VALID_YEAR_DF"
output_directory_invalid = "/share/smartdata/citations/UPDATED_UNVALID_YEAR_DF"

# Create the output directories if they do not exist
os.makedirs(output_directory_valid, exist_ok=True)
os.makedirs(output_directory_invalid, exist_ok=True)

# Read all CSV files in the input directory into a single DataFrame
all_data_df = spark.read.option("header", "true").csv(input_directory)

# Assuming 'pub_date' contains the year and possibly more details, we need to extract just the year
# Adding a new column 'year' that extracts the year from 'pub_date'
all_data_df = all_data_df.withColumn("year", regexp_extract("pub_date", r"(\d{4})", 1))

# Filtering data into valid and invalid DataFrames based on the year
valid_data_df = all_data_df.filter((col("year") <= "2024") & (col("year").isNotNull()))
invalid_data_df = all_data_df.filter((col("year") > "2024") | (col("year").isNull()))

# Writing out the valid and invalid data, partitioned by the 'year' column
valid_data_df.write.partitionBy("year").option("header", "true").csv(output_directory_valid, mode="overwrite")
invalid_data_df.write.option("header", "true").csv(output_directory_invalid, mode="overwrite")


                                                                                

In [1]:
# Chiusura della sessione Spark
spark.stop()

24/10/02 19:19:37 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
