### Import libraries

In [1]:
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, TimestampType, ArrayType
from pyspark.sql.functions import lit, udf, col, when, explode
# import pyodbc
from langdetect import detect
from functools import reduce
# from googletrans import Translator
from dotenv import load_dotenv
# Load environment variables
load_dotenv(".env")



True

In [2]:
# Spark Session
spark = SparkSession.builder.appName("youtube-ETL").config("spark.jars", "/home/elhossiny/Downloads/sqljdbc_12.8.1.0_enu/sqljdbc_12.8/enu/jars/mssql-jdbc-12.8.1.jre11.jar").getOrCreate()


25/02/13 08:13:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Config connection properties With Sql Server

In [3]:
db_user = os.getenv("DB_USER")
db_password = os.getenv("DB_PASSWORD")
user = "sa"
password = ""

jdbc_url = "jdbc:sqlserver://localhost:1433;databaseName=warehouse;encrypt=true;trustServerCertificate=true"
connection_properties = {
    "user": db_user,
    "password": db_password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"

}

In [17]:
files = os.listdir("./data/")
print(files)

['CA_category_id.json', 'CAvideos.csv', 'FRvideos.csv', 'JP_category_id.json', 'JPvideos.csv', 'FR_category_id.json']


In [5]:
def split_files(file_list):
    csv_files = [file for file in file_list if file.endswith('.csv')]
    json_files = [file for file in file_list if file.endswith('.json')]

    return csv_files, json_files

csv_files, json_files = split_files(files)

In [6]:
json_files

['CA_category_id.json', 'JP_category_id.json', 'FR_category_id.json']

In [7]:

schemaVideo = StructType([
    StructField("video_id", StringType(), True),
    StructField("trending_date", StringType(), True),  
    StructField("title", StringType(), True),
    StructField("channel_title", StringType(), True),
    StructField("category_id", StringType(), True),
    StructField("publish_time", TimestampType(), True),  
    StructField("tags", StringType(), True),
    StructField("views", IntegerType(), True),
    StructField("likes", IntegerType(), True),
    StructField("dislikes", IntegerType(), True),
    StructField("comment_count", IntegerType(), True),
    StructField("thumbnail_link", StringType(), True),
    StructField("comments_disabled", BooleanType(), True),
    StructField("ratings_disabled", BooleanType(), True),
    StructField("video_error_or_removed", BooleanType(), True),
    StructField("description", StringType(), True)
])

schemaCategory = StructType([
    StructField("kind", StringType(), True),
    StructField("etag", StringType(), True),
    StructField("items", ArrayType(
        StructType([
            StructField("kind", StringType(), True),
            StructField("etag", StringType(), True),
            StructField("id", StringType(), True),
            StructField("snippet", StructType([
                StructField("channelId", StringType(), True),
                StructField("title", StringType(), True),
                StructField("assignable", BooleanType(), True)
            ]), True)
        ])
    ), True)
])

### Funcations

In [None]:
def Extract_df_From_Files(file):
    try:
        fileExtension = os.path.splitext(file)[1]
        if fileExtension == ".csv":
            df = spark.read.csv(file, header=True, schema=schemaVideo)
            df_limited = df.limit(500)
            return df_limited
        elif fileExtension == ".json":
            df = spark.read.schema(schemaCategory).option("multiLine", "true").json(file)
            good_df = df.select(explode(col("items")).alias("item")) \
            .select(col("item.id"), col("item.snippet.title"))
            return good_df
        else :
            print(f" {file} The extension must be .csv")
            return
    except Exception as e:
        print(f"Error is {e}")

In [9]:
def remove_rows_with_more_than_10_empty(df):
    # Count the number of missing values in each row across all columns
    null_count_expr = sum(when((col(c).isNull()) | (col(c) == ""), 1).otherwise(0) for c in df.columns)
    # Keep rows that contain 6 or fewer missing values
    return df.filter(null_count_expr <= 6)


In [10]:
# def translate_to_english(text):
#     translator = Translator()
#     try:
#         translation = translator.translate(text, dest='en')
#         return translation.text
#     except Exception as e:
#         return None
# translate_udf = udf(translate_to_english, StringType())

In [11]:
def detect_language(text):
    if text is not None:
        try:
            return detect(text)
        except:
            return "unknown"
    return "unknown"

detect_language_udf = udf(detect_language, StringType())

In [12]:
def Transform_df(df, country_code):
    df_without_empty = remove_rows_with_more_than_10_empty(df)
    df_with_country = df_without_empty.withColumn("Country", lit(country_code))
    df_with_languages = df_with_country.withColumn("Language", detect_language_udf(df_with_country['title']))
    # excluded_column = "Country" # The column that should be excluded from the duplicate check
    # columns_to_check = [col for col in df.columns if col != excluded_column]
    df_unique = df_with_languages.dropDuplicates(["video_id"])
    # for col_name in columns:
    #     df_with_languages = df_with_languages.withColumn("Translation_" + col_name, translate_udf(df_with_languages[col_name]))
    return df_unique

In [13]:
# def save_db(df):
#     pandas_df = df.toPandas()
#     job = client.load_table_from_dataframe(pandas_df, table_id )
#     job.result() 
#     print("Data has been successfully uploaded to BigQuery using google-cloud-bigquery!")

def save_db(df, table_name, state):
    # Save DataFrame TO SQL Server
    # append or overwrite
    df.write.mode(state).jdbc( 
      url=jdbc_url,
      table=table_name,
      properties=connection_properties
  )
    print("Data has been successfully uploaded to SQL server!")




## Transformation on files

### Youtube data

In [14]:
for file in csv_files:
    df = Extract_df_From_Files(file)
    country_code = file[:2]  # Extract First 2
    transform_df = Transform_df(df, country_code)
    save_db(transform_df, "youtube_data", "append")

25/02/13 08:13:44 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Data has been successfully uploaded to SQL server!


                                                                                

Data has been successfully uploaded to SQL server!
Data has been successfully uploaded to SQL server!


### Category data

In [15]:
dfs = []
for file in json_files:
    df = Extract_df_From_Files(file)
    dfs.append(df)

merged_df = reduce(lambda df1, df2: df1.union(df2), dfs)
unique_df = merged_df.dropDuplicates(["id"])
save_db(unique_df, "category_data", "overwrite")

Data has been successfully uploaded to SQL server!


## Some Knowledge about the data


In [16]:
for file in csv_files:
    # Read the file and extract the DataFrame
    df = Extract_df_From_Files(file)
    
    print("Show the first 5 rows:")
    df.show(5)
    
    print("Show the general statistics:")
    df.describe().show()
    
    print("Schema of the DataFrame:")
    df.printSchema()
    
    print("Count missing values in each column:")
    for col in df.columns:
        missing_count = df.filter(F.col(col).isNull()).count()
        print(f"{col}: {missing_count}")

Show the first 5 rows:
+-----------+-------------+--------------------+-------------+-----------+-------------------+--------------------+--------+-------+--------+-------------+--------------------+-----------------+----------------+----------------------+--------------------+
|   video_id|trending_date|               title|channel_title|category_id|       publish_time|                tags|   views|  likes|dislikes|comment_count|      thumbnail_link|comments_disabled|ratings_disabled|video_error_or_removed|         description|
+-----------+-------------+--------------------+-------------+-----------+-------------------+--------------------+--------+-------+--------+-------------+--------------------+-----------------+----------------+----------------------+--------------------+
|n1WpP7iowLc|     17.14.11|Eminem - Walk On ...|   EminemVEVO|         10|2017-11-10 19:00:03|"Eminem"|"Walk"|"...|17158579| 787425|   43420|       125882|https://i.ytimg.c...|            false|           fals

## Thank you for reading my code