## Get Data from HDFS

In [24]:
import pandas as pd
import requests
import json
import os
from hdfs import InsecureClient
from dotenv import load_dotenv
from datetime import datetime, timedelta
from time import sleep

# Function to get Actor detail
def get_actors_details(actors_filepath):
    with hdfs_client.read(actors_filepath) as hdfs_file:
        actors_content = hdfs_file.read()
        json_objects = []
        for line in actors_content.splitlines():
            try:
                json_object = json.loads(line)
                json_objects.append(json_object)
            except:
                continue
        return json_objects
    
# Function to get movies details
def get_movies_details(movies_filepath):
    with hdfs_client.read(movies_filepath) as hdfs_file:
        movies_content = hdfs_file.read()
        json_objects = []
        for line in movies_content.splitlines():
            try:
                json_object = json.loads(line)
                json_objects.append(json_object)
            except:
                continue
        return json_objects
    
# Function to get movie and actor relationship
def get_acted_details(acted_filepath):
    with hdfs_client.read(acted_filepath) as hdfs_file:
        acted_content = hdfs_file.read()
        json_objects = []
        for line in acted_content.splitlines():
            try:
                json_object = json.loads(line)
                json_objects.append(json_object)
            except:
                continue
        return json_objects

# Load environment variables from .env
load_dotenv()

# Get the API key (Create a .env file and add API_KEY variable in it)
API_KEY = os.getenv('API_KEY')

# Specify credentials
user_name = 'hicham'
host = 'http://localhost:9870'

# Connect to HDFS
hdfs_client = InsecureClient(host, user=user_name)

# Specify files path directory name as today
hdfs_file_path = f"{datetime.now().strftime('%d-%m-%Y')}"

# Get directories located in today directory
directories = hdfs_client.list(hdfs_file_path, status=True)

# Get directories names to iterate them
directory_names = [directory[1]['pathSuffix'] for directory in directories if directory[1]['type'] == 'DIRECTORY']

actors = []
movies = []
acted = []
for directory in directory_names:
    # HDFS files path
    actors_filepath = hdfs_file_path + f"/{directory}/" + "actors.json"
    movies_filepath = hdfs_file_path + f"/{directory}/" + "movies.json"
    acted_filepath = hdfs_file_path + f"/{directory}/" + "acted.json"

    actors.extend(get_actors_details(actors_filepath))
    movies.extend(get_movies_details(movies_filepath))
    acted.extend(get_acted_details(acted_filepath))


## Connect to sql server

In [25]:
import pyodbc as odbc

# Set up the connection string
driver = 'ODBC Driver 17 for SQL Server'
server = '169.254.26.93'
database = 'master'
username = 'hicham'
password = '123@123'

connection_string = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password}" # UID={username};PWD={password}

# Connect to default database;
connection = odbc.connect(connection_string, autocommit=True)
cursor = connection.cursor()

## SQL Statements execution

In [26]:
# Function to execute SQL statements
def execute_sql_statement(sql):
    cursor.execute(sql)
    connection.commit()

## Create the Database and use it

In [27]:
# Create Database MovieDB
sql = ('''
    IF EXISTS (SELECT name FROM sys.databases WHERE name = 'MovieDB')
        PRINT 'Database MovieDB exist'
    ELSE
        CREATE DATABASE MovieDB;
    
    USE MovieDB;
''')
execute_sql_statement(sql)

## Create Tables

In [28]:
# Function to create Production_Country table
def create_table_production_country():
    sql = """
    IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Production_Country' and xtype='U')
        CREATE TABLE [Production_Country] (
        [prod_country_id] Varchar(10),
        [name] Varchar(55),
        PRIMARY KEY ([prod_country_id])
    )
    """
    execute_sql_statement(sql)

# Function to create Country table
def create_table_country():
    sql = """
    IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Country' and xtype='U')
        CREATE TABLE [Country] (
        [country_id] Integer,
        [name] Varchar(255),
        PRIMARY KEY ([country_id])
        )
    """
    execute_sql_statement(sql)

# Function to create City table
def create_table_city():
    sql = """
    IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='City' and xtype='U')
        CREATE TABLE [City] (
        [city_id] Integer,
        [name] Varchar(255),
        PRIMARY KEY ([city_id])
        )
    """
    execute_sql_statement(sql)

# Function to create Gender table
def create_table_gender():
    sql = """
    IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Gender' and xtype='U')
        CREATE TABLE [Gender] (
        [gender_id] Integer,
        [name] Varchar(50),
        PRIMARY KEY ([gender_id])
        )
    """
    execute_sql_statement(sql)

# Function to create Department table
def create_table_department():
    sql = """
    IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Department' and xtype='U')
        CREATE TABLE [Department] (
        [dep_id] Integer,
        [name] Varchar(255),
        PRIMARY KEY ([dep_id])
        )
    """
    execute_sql_statement(sql)

# Function to create Actor table
def create_table_actor():
    sql = """
    IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Actor' and xtype='U')
        CREATE TABLE [Actor] (
        [actor_id] Integer,
        [gender_id] Integer,
        [dep_id] Integer,
        [country_id] Integer,
        [city_id] Integer,
        [full_name] varchar(255),
        [profile_path] Varchar(255),
        [birthday] Date,
        [deathday] Date,
        PRIMARY KEY ([actor_id]),
        CONSTRAINT [FK_Actor.country_id]
            FOREIGN KEY ([country_id])
            REFERENCES [Country]([country_id]),
        CONSTRAINT [FK_Actor.gender_id]
            FOREIGN KEY ([gender_id])
            REFERENCES [Gender]([gender_id]),
        CONSTRAINT [FK_Actor.dep_id]
            FOREIGN KEY ([dep_id])
            REFERENCES [Department]([dep_id]),
        CONSTRAINT [FK_Actor.city_id]
            FOREIGN KEY ([city_id])
            REFERENCES [City]([city_id])
        )
    """
    execute_sql_statement(sql)

# Function to create Movie table
def create_table_movie():
    sql = """
    IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Movie' and xtype='U')
        CREATE TABLE [Movie] (
        [movie_id] Integer,
        [title] Varchar(255),
        [original_title] Varchar(255),
        [original_language] Varchar(5),
        [overview] Text,
        [poster_path] Varchar(255),
        [release_date] Date,
        PRIMARY KEY ([movie_id])
        )
    """
    execute_sql_statement(sql)

# Function to create Production_Company table
def create_table_production_company():
    sql = """
    IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Production_Company' and xtype='U')
        CREATE TABLE [Production_Company] (
        [prod_company_id] Integer,
        [name] Varchar(255),
        [logo_path] Varchar(255),
        [origin_country] Varchar(10),
        PRIMARY KEY ([prod_company_id])
        )
    """
    execute_sql_statement(sql)

# Function to create Movie_Prod_Company table
def create_table_movie_prod_company():
    sql = """
    IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Movie_Prod_Company' and xtype='U')
        CREATE TABLE [Movie_Prod_Company] (
        [movie_id] Integer,
        [prod_company_id] Integer,
        CONSTRAINT [FK_Movie_Prod_Company.movie_id]
            FOREIGN KEY ([movie_id])
            REFERENCES [Movie]([movie_id]),
        CONSTRAINT [FK_Movie_Prod_Company.prod_company_id]
            FOREIGN KEY ([prod_company_id])
            REFERENCES [Production_Company]([prod_company_id])
        )
    """
    execute_sql_statement(sql)

# Function to create Movie_Prod_Country table
def create_table_movie_prod_country():
    sql = """
    IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Movie_Prod_Country' and xtype='U')
        CREATE TABLE [Movie_Prod_Country] (
        [movie_id] Integer,
        [prod_country_id] Varchar(10),
        CONSTRAINT [FK_Movie_Prod_Country.movie_id]
            FOREIGN KEY ([movie_id])
            REFERENCES [Movie]([movie_id]),
        CONSTRAINT [FK_Movie_Prod_Country.prod_country_id]
            FOREIGN KEY ([prod_country_id])
            REFERENCES [Production_Country]([prod_country_id])
        )
    """
    execute_sql_statement(sql)

# Function to create Genre table
def create_table_genre():
    sql = """
    IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Genre' and xtype='U')
        CREATE TABLE [Genre] (
        [genre_id] Integer,
        [name] Varchar(55),
        PRIMARY KEY ([genre_id])
        )
    """
    execute_sql_statement(sql)

# Function to create Movie_Genre table
def create_table_movie_genre():
    sql = """
    IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Movie_Genre' and xtype='U')
        CREATE TABLE [Movie_Genre] (
        [movie_id] Integer,
        [genre_id] Integer,
        CONSTRAINT [FK_Movie_Genre.movie_id]
            FOREIGN KEY ([movie_id])
            REFERENCES [Movie]([movie_id]),
        CONSTRAINT [FK_Movie_Genre.genre_id]
            FOREIGN KEY ([genre_id])
            REFERENCES [Genre]([genre_id])
        )
    """
    execute_sql_statement(sql)

# Function to create Realization table
def create_table_realization():
    sql = """
    IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Realization' and xtype='U')
        CREATE TABLE [Realization] (
        [actor_id] Integer,
        [movie_id] Integer,
        [actor_popularity] Float,
        [movie_budget] NUMERIC(18,0),
        [movie_revenue] NUMERIC(18,0),
        [movie_popularity] Float,
        [movie_vote_average] Float,
        [movie_vote_count] NUMERIC(18,0),
        CONSTRAINT [FK_realization.actor_id]
            FOREIGN KEY ([actor_id])
            REFERENCES [Actor]([actor_id]),
        CONSTRAINT [FK_realization.movie_id]
            FOREIGN KEY ([movie_id])
            REFERENCES [Movie]([movie_id])
        )
    """
    execute_sql_statement(sql)


In [29]:
create_table_genre()
create_table_production_company()
create_table_production_country()
create_table_movie()
create_table_movie_genre()
create_table_movie_prod_company()
create_table_movie_prod_country()
create_table_gender()
create_table_department()
create_table_country()
create_table_city()
create_table_actor()
create_table_realization()

## Insert Values

In [30]:
# Function to execute INSERT INTO TABLE statements
def execute_insert_values(sql, values):
    try:
        cursor.execute(sql, values)
        connection.commit()
        print("Insertion successful!")
    except odbc.IntegrityError as e:
        print("Duplicate key value error!")
        print(f"Error message: {str(e)}")
        pass

# Function to insert data into the table
def insert_data_into_table(table_name, data):
    keys = data.keys()
    values = list(data.values())

    placeholders = ', '.join('?' * len(keys))
    columns = ', '.join(keys)
    sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
    
    execute_insert_values(sql, values)

    # sql = """
    # INSERT INTO [Realization] (
    #     [actor_id],
    #     [movie_id],
    #     [actor_popularity],
    #     [movie_budget],
    #     [movie_revenue],
    #     [movie_popularity],
    #     [movie_vote_average],
    #     [movie_vote_count]
    # )
    # VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    # """
    # values = (
    #     object['actor_id'],
    #     object['movie_id'],
    #     object['actor_popularity'],
    #     object['movie_budget'],
    #     object['movie_revenue'],
    #     object['movie_popularity'],
    #     object['movie_vote_average'],
    #     object['movie_vote_count']
    # )
    # execute_insert_values

In [31]:
# CREATE INDEX [PK/FK] ON  [Movie_Prod_Company] ([movie_id], [prod_company_id]);

# CREATE INDEX [PK/FK] ON  [Movie_Prod_Country] ([movie_id], [prod_country_id]);

# CREATE INDEX [PK/FK] ON  [Movie_Genre] ([movie_id], [genre_id]);

# CREATE INDEX [PK/FK] ON  [realization] ([actor_id], [movie_id]);

## Prepare data for insertion

In [36]:
from geopy.geocoders import Nominatim

# Function to find an element by id
def find_element_by_id(el_list, type, id):
    for el in el_list:
        if type == 'movie' and el['movie_id'] == id:
            return el
        elif type == 'actor' and el['actor_id'] == id:
            return el
    return None

# Function to get departement info by name
def get_department_info(department_name):
    departments = {
        'Writing': {'dep_id': 1, 'name': 'Writing'},
        'Creator': {'dep_id': 2, 'name': 'Creator'},
        'Art': {'dep_id': 3, 'name': 'Art'},
        'Costume & Make-Up': {'dep_id': 4, 'name': 'Costume & Make-Up'},
        'Editing': {'dep_id': 5, 'name': 'Editing'},
        'Directing': {'dep_id': 6, 'name': 'Directing'},
        'Production': {'dep_id': 7, 'name': 'Production'},
        'Camera': {'dep_id': 8, 'name': 'Camera'},
        'Sound': {'dep_id': 9, 'name': 'Sound'},
        'Crew': {'dep_id': 10, 'name': 'Crew'},
        'Acting': {'dep_id': 11, 'name': 'Acting'},
    }
    department = departments.get(department_name)
    if department is None:
        # Add the new department to the dictionary if not exist
        new_dep_id = len(departments) + 1
        department = {'dep_id': new_dep_id, 'name': department_name}
        departments[department_name] = department
        print("New departement was added : ", departments[department_name])

    return department

# Function to get gender name by id
def get_gender_info(id):
    genders = {
        '0' : 'Not specified',
        '1' : 'Female',
        '2' : 'Male'
    }
    gender = genders.get(id)
    if not gender:
        return genders.get(0)
    return gender

# Function to encode the country name and city name to int code as id
def encode_item(name, length):
    mybytes = name.encode('utf-8')
    myint = int.from_bytes(mybytes, 'little')
    encoded_value = str(myint)[:length]
    return int(encoded_value)

# Correct the name of the country or city to be recognized
def recognize_place_of_birth(place):
    # Manual correction mapping for non-standard place names
    place_name_mapping = {
        "U.S.": "United States",
        "UK": "United Kingdom",
        "USSR (Russia)": "Russia",
        "Paddington (Circle and Hammersmith & City lines)": "Paddington"
    }
    # Manually correct the place of birth if not recognized by the geocoding service
    corrected_place_of_birth = place
    for key, value in place_name_mapping.items():
        corrected_place_of_birth = corrected_place_of_birth.replace(key, value)
    
    return corrected_place_of_birth

# Function to check if the input is city or not
def is_country(text):
    print("Checking address details wait 3 seconds..")
    sleep(3)
    geolocator = Nominatim(user_agent="my_geocoder2")
    location = geolocator.geocode(text, exactly_one=True, addressdetails=True, language='en')
    
    if location is not None and 'address' in location.raw:
        address = location.raw
        if 'country' in address and 'city' not in address:
            return True
    
    return False

# Function to get place of birth of actor using geopy library
def get_location_info(place_of_birth):
    if place_of_birth:
        max_retries = 3
        retry_count = 0
        success = False
        while retry_count < max_retries and not success:
            try:
                geolocator = Nominatim(user_agent="my_geocoder")
                # Retrieve location information based on place of birth
                print("start retrieving address details..")
                location = geolocator.geocode(place_of_birth, exactly_one=True, language='en')
                print("Checking address, wait 5 seconds..")
                sleep(5)
                if location is not None:
                    country = location.address.split(',')[-1].strip()
                    city = location.address.split(',')[0].strip()
                    country = recognize_place_of_birth(country)
                    city = recognize_place_of_birth(city)
                    # Here I test if the name of the city returned is correct or not
                    if not is_country(city):
                        id_country = encode_item(country, 8)
                        id_city = encode_item(city, 8)
                        if country and city:
                            return {
                                'country': {'id' : id_country, 'name' : country}, 
                                'city': {'id' : id_city, 'name' : city}
                            }
                    else:
                        id_country = encode_item(country, 8)
                        if country and city:
                            return {
                                'country': {'id' : id_country, 'name' : country}, 
                                'city': {'id' : 0, 'name' : 'None'}
                            }
                else:
                    # Split the place of birth manually if the geopy didn't recognize the address
                    place_of_birth = recognize_place_of_birth(place_of_birth)
                    country = place_of_birth.split(',')[-1].strip()
                    city = place_of_birth.split(',')[0].strip()
                    id_country = encode_item(country, 8)
                    id_city = encode_item(city, 8)
                    if country and city:
                        return {
                            'country': {'id' : id_country, 'name' : country}, 
                            'city': {'id' : id_city, 'name' : city}
                        }
            except ConnectionError as e:
                print("Failed to establish a network connection. Please check your internet connectivity.")
                continue
            retry_count += 1
        if not success:
            print("Maximum number of retries exceeded. Failed to get a successful response.")
    return None

realisations = []
for act in acted:
    movie = find_element_by_id(movies, 'movie', act.get('movie_id'))
    actor = find_element_by_id(actors, 'actor', act.get('actor_id'))
    is_actor_inserted = False
    if movie and actor:
        realization_object = {
            'actor_id' : act.get('actor_id'),
            'movie_id' : act.get('movie_id'),
            'actor_popularity' : actor.get('popularity'),
            'movie_budget' : movie.get('budget'),
            'movie_revenue' : movie.get('revenue'),
            'movie_popularity' : movie.get('popularity'),
            'movie_vote_average' : movie.get('vote_average'),
            'movie_vote_count' : movie.get('vote_count'),
        }
        print(realization_object)

        if movie:
            movie_object = {
                'movie_id' : movie.get('movie_id'),
                'title' : movie.get('title'),
                'original_title' : movie.get('original_title'),
                'original_language' : movie.get('original_language'),
                'overview' : movie.get('overview'),
                'poster_path' : movie.get('poster_path'),
                'release_date' : movie.get('release_date'),
            }
            # print(movie)
            for genre in movie.get('genres'):
                movie_genre = {
                    'movie_id' : movie.get('movie_id'),
                    'genre_id' : genre.get('id')
                }
                genre = {
                    'genre_id' : genre.get('id'),
                    'name' : genre.get('name')
                }
                # print(genre)
            for company in movie.get('production_companies'):
                movie_prod_company = {
                    'movie_id' : movie.get('movie_id'),
                    'prod_company_id' : company.get('id')
                }
                production_company = {
                    'prod_company_id' : company.get('id'),
                    'name' : company.get('name'),
                    'logo_path' : company.get('logo_path'),
                    'origin_country' : company.get('origin_country'),
                }
                # print(str(movie_prod_company) + "\n" + str(production_company))
            for country in movie.get('production_countries'):
                movie_prod_country = {
                    'movie_id' : movie.get('movie_id'),
                    'prod_country_id' : country.get('iso_3166_1')
                }
                production_country = {
                    'prod_country_id' : country.get('iso_3166_1'),
                    'name' : country.get('name')
                }
                # print(str(movie_prod_country) + "\n" + str(production_country))

        if actor and not is_actor_inserted:
            dep = get_department_info(actor.get('department'))
            department = {
                'dep_id' : dep.get('dep_id'),
                'name' : dep.get('name'),
            }
            # print(department)
            gender = {
                'gender_id' : actor.get('gender'),
                'name' : get_gender_info(str(actor.get('gender'))),
            }
            # print(gender)
            # print(actor.get('place_of_birth'))
            location_info = get_location_info(actor.get('place_of_birth'))
            if location_info:
                country = location_info.get('country')
                city = location_info.get('city')
                # Create country and city objects
                country_object = {
                    'country_id' : country.get('id'),
                    'name' : country.get('name'),
                }
                city_object = {
                    'city_id' : city.get('id'),
                    'name' : city.get('name'),
                }
                
            else:
                country_object = {
                    'country_id' : 0,
                    'name' : 'None',
                }
                city_object = {
                    'city_id' : 0,
                    'name' : 'None',
                }
            # print(actor.get('place_of_birth'))
            # print(country_object)
            # print(city_object)
            # print("\n")
            actor_object = {
                'actor_id' : actor.get('actor_id'),
                'gender_id' : gender.get('gender_id'),
                'dep_id' : department.get('dep_id'),
                'country_id' : country_object.get('country_id'),
                'city_id' : city_object.get('city_id'),
                'full_name' : actor.get('name'),
                'profile_path' : actor.get('profile_path'),
                'birthday' : actor.get('birthday'),
                'deathday' : actor.get('deathday')
            }
            # print(actor_object)
            # print("\n")

            is_actor_inserted = True
        
        insert_data_into_table('Movie', movie_object)
        insert_data_into_table('Genre', genre)
        insert_data_into_table('Movie_Genre', movie_genre)
        insert_data_into_table('Production_Company', production_company)
        insert_data_into_table('Movie_Prod_Company', movie_prod_company)
        insert_data_into_table('Production_Country', production_country)
        insert_data_into_table('Movie_Prod_Country', movie_prod_country)
        insert_data_into_table('Department', department)
        insert_data_into_table('Gender', gender)
        insert_data_into_table('Country', country_object)
        insert_data_into_table('City', city_object)
        insert_data_into_table('Actor', actor_object)
        insert_data_into_table('Realization', realization_object)
        
        print("Data inserted successfuly.")

print("Insertion process done.")


{'actor_id': 8691, 'movie_id': 19995, 'actor_popularity': 81.636, 'movie_budget': 237000000, 'movie_revenue': 2923706026, 'movie_popularity': 123.943, 'movie_vote_average': 7.576, 'movie_vote_count': 30351}
start retrieving address details..
Checking address, wait 5 seconds..
Checking address details wait 3 seconds..
Duplicate key value error!
Error message: ('23000', "[23000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Violation of PRIMARY KEY constraint 'PK__Movie__83CDF749ADB3E518'. Cannot insert duplicate key in object 'dbo.Movie'. The duplicate key value is (19995). (2627) (SQLExecDirectW)")
Duplicate key value error!
Error message: ('23000', "[23000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Violation of PRIMARY KEY constraint 'PK__Genre__18428D42EC9C34A4'. Cannot insert duplicate key in object 'dbo.Genre'. The duplicate key value is (878). (2627) (SQLExecDirectW)")
Insertion successful!
Duplicate key value error!
Error message: ('23000', "[23000] [Microsoft

GeocoderUnavailable: HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Max retries exceeded with url: /search?q=New+York%2C+New+York%2C+U.S.A.&format=json&limit=1&accept-language=en (Caused by ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)"))