In [None]:
import os
import dotenv
from sqlalchemy import create_engine
import pandas as pd


# Load environment variables
dotenv.load_dotenv(dotenv.find_dotenv())
hostname = os.getenv('DATABASE_HOST')
user = os.getenv('DATABASE_USER')
password = os.getenv('DATABASE_PASSWORD')
database = os.getenv('DATABASE_NAME') 

# Set up database connection using SSL
db_engine = create_engine(f'mysql+pymysql://{user}:{password}@{hostname}/{database}', 
                       connect_args={
                           'ssl_ca': os.getenv('SSL_CA'),
                           'ssl_cert': os.getenv('SSL_CLIENT_CERT'),
                           'ssl_key': os.getenv('SSL_KEY')
                       })

"""
!!!UNENCRYPTED CONNECTION!!!
For use within local network only!
db_engine = create_engine(f'mysql+pymysql://{user}:{password}@{hostname}/{database}') 
!!!UNENCRYPTED CONNECTION!!!
"""

In [None]:
# Function to check data types and max lengths of values
def check_data(dataset):
    for column in dataset.columns:
        print(f'Column: {column}')
        print(f'Data type: {dataset[column].dtype}')
        try:
            print(f'Max length: {dataset[column].str.len().max()}')
        except:
            print('No string data ')
        print('---\n')

In [None]:
path = os.getenv('DATASET_PATH')

# Read JSONs from file
df = pd.read_json(path, lines=True)

check_data(df)

In [None]:
# Function to check if language already exists
def check_and_insert_language(languages_data_entry, db_con):
    # Define query to find existing record
    query = """
        SELECT id
        FROM DIM_Languages
        WHERE listOfLanguages = %(listOfLanguages)s
    """
    
    # Query database for existing record
    query_result = pd.read_sql_query(query, db_con, params=languages_data_entry)

    # Check if a record was found or insert a new record
    if not query_result.empty:
        # Record found -> use its id
        entry_id = query_result.iloc[0]['id']
    else:
        # No record found -> insert new record and query its id
        languages_df = pd.DataFrame([languages_data_entry])
        languages_df.to_sql('DIM_Languages', con=db_con, if_exists='append', index=False)
        entry_id = pd.read_sql_query("SELECT LAST_INSERT_ID()", db_con).iloc[0, 0]
    
    # Return ID
    return entry_id

# Function to check if location already exists
def check_and_insert_location(location_data_entry, db_con):
    # Define query to find existing record
    query = """
        SELECT id
        FROM DIM_Location
        WHERE countryName = %(countryName)s
        AND stateName = %(stateName)s
        AND cityName = %(cityName)s
    """
    
    # Query database for existing record
    query_result = pd.read_sql_query(query, db_con, params=location_data_entry)

    # Check if a record was found or insert a new record
    if not query_result.empty:
        # Record found -> use its id
        entry_id = query_result.iloc[0]['id']
    else:
        # No record found -> insert new record and query its id
        location_df = pd.DataFrame([location_data_entry])
        location_df.to_sql('DIM_Location', con=db_con, if_exists='append', index=False)
        entry_id = pd.read_sql_query("SELECT LAST_INSERT_ID()", db_con).iloc[0, 0]
    
    # Return ID
    return entry_id

In [None]:
# Function to insert data origin and return its ID
def insert_data_origin(data_origin_name, data_origin_url, data_origin_comment):
    df_origin = pd.DataFrame([{
        'name': data_origin_name, 
        'url': data_origin_url, 
        'importDate': pd.Timestamp.now(), 
        'comment': data_origin_comment}])

    # Insert data into database
    df_origin.to_sql('DIM_DataOrigin', con=db_engine, if_exists='append', index=False)
    
    # Get ID of inserted data origin
    id_origin = pd.read_sql_query("SELECT LAST_INSERT_ID()", db_engine).iloc[0, 0]
    
    # Return ID of inserted data origin
    return id_origin

In [None]:
# Function to process person facts + dimensions and insert them into the database
def process_json(json_object, dimension_key_origin, db_con):
    """
    Extract data from LinkedIn profiles JSON and prepare it for insertion into DWH.

    Missing attributes will be added as comments, 
    complex attributes will be receiving their own tables and be set to id None for now.
    """
    
    # Extract data for Person table
    person_data = {
        'idOrigin': dimension_key_origin,
        'linkedInPubId': json_object.get('public_identifier'),
        'profilePictureUrl': json_object.get('profile_pic_url'),
        'backgroundPictureUrl': json_object.get('background_cover_image_url'),
        # Last name
        # First name
        'name': json_object.get('full_name'),
        'occupation': json_object.get('occupation'),
        'profileHeadline': json_object.get('headline'),
        'profileText': json_object.get('summary'),
        'idLocation': None,
        # Experiences
        # Education
        'idLanguages': None,
        # Accomplishments (organisations, publications, honor awards, patents, courses, projects, test scores)
        # Volunteer work
        # Certifications
        # Connections (included but no need to rename)
        # Activities
        # Similar named profiles
        # Articles
        # Groups
        # Skills
        # Infrared salary
        # Github (included but no need to rename)
        # Facebook (included but no need to rename)
        # Gender (will be converted to enum M/F)
        'birthDate': json_object.get('birth_date'),
        # Industry
        # Interests
    }

    # Extract data for Location table
    location_data = {
        'countryName': json_object.get('country_full_name'),
        'countryLetters': json_object.get('country'),
        'stateName': json_object.get('state'),
        'cityName': json_object.get('city')
    }
    
    # Extract data for Languages table
    languages_data = {
        'sumOfSpoken': len(json_object.get('languages', [])),
        'listOfLanguages': ', '.join(json_object.get('languages', []))
    }
    
    # Create DataFrames
    person_df = pd.DataFrame([person_data])
    languages_df = pd.DataFrame([languages_data])
    location_df = pd.DataFrame([location_data])
    
    # Insert languages and add ID to person
    languages_df.to_sql('DIM_Languages', con=db_con, if_exists='append', index=False)
    dimension_key_languages = pd.read_sql_query("SELECT LAST_INSERT_ID()", db_con).iloc[0, 0]
    person_df['idLanguages'] = dimension_key_languages
    
    # Insert location and add ID to person
    location_df.to_sql('DIM_Location', con=db_con, if_exists='append', index=False)
    dimension_key_location = pd.read_sql_query("SELECT LAST_INSERT_ID()", db_con).iloc[0, 0]
    person_df['idLocation'] = dimension_key_location
    
    # Insert person data and get ID
    person_df.to_sql('FACT_Person', con=db_con, if_exists='append', index=False)
    pk_person = pd.read_sql_query("SELECT LAST_INSERT_ID()", db_con).iloc[0, 0]
    
    # Return primary key of inserted person
    return pk_person

In [None]:
    
    # Extract data for Qualification table
    qualification_data = []
    for qualification in json_object.get('qualifications', []):
        qualification_data.append({
            'idOrigin': json_object.get('idOrigin'),
            'idPerson': None,  # Placeholder for idPerson
            'idDuration': None,  # Placeholder for idDuration
            'type': qualification.get('type'),
            'name': qualification.get('name'),
            'idInstitution': None,  # Placeholder for idInstitution
            'description': qualification.get('description')
        })



    qualification_df = pd.DataFrame(qualification_data)

    # Insert data into the database
    engine = create_engine('your_database_url')







    # Insert data into FACT_Qualification table
    qualification_df.to_sql('FACT_Qualification', con=engine, if_exists='append', index=False)

In [1]:
from pymongo import MongoClient


# Function to get schema of a MongoDB collection and return it as a dictionary
def get_schema(collection):
    collection_schema = {}
    total_documents = collection.count_documents({})

    # Iterate over the collection
    for document in collection.find():
        for key, value in document.items():
            # Add attribute to schema if not present already
            if key not in collection_schema:
                collection_schema[key] = {
                    "types": set(),
                    "max_length": 0,
                    # Maybe include median length or 90% quartile?
                    "exists": 0,
                    "null_percentage": 0,
                    "max_value": None
                }
            
            # Determine type
            collection_schema[key]["types"].add(type(value).__name__)
            

            # Calculate size and check if value is null
            if isinstance(value, str):
                length = len(value)
                if length > collection_schema[key]["max_length"]:
                    collection_schema[key]["max_length"] = length
                    collection_schema[key]["max_value"] = value
                
                # Update null counter
                collection_schema[key]["exists"] += 1
            elif isinstance(value, (int, float)):
                # Skip for numbers
                # if collection_schema[key]["max_value"] is None or value > collection_schema[key]["max_value"]:
                #    collection_schema[key]["max_value"] = value
                
                # Update null counter
                collection_schema[key]["exists"] += 1
            elif isinstance(value, list):
                length = len(value)
                if length > collection_schema[key]["max_length"]:
                    collection_schema[key]["max_length"] = length
                
                # Update null counter
                collection_schema[key]["exists"] += 1
            elif value is not None:
                # Update null counter
                collection_schema[key]["exists"] += 1
            elif value is None:
                # Update null counter with null value
                collection_schema[key]["exists"] += 1
                collection_schema[key]["null_percentage"] += 1
            else:
                # Idk why i did this... Me no brain xD
                print(value["_id"])
                print(type(value).__name__)
    
    # Calculate the population
    for key in collection_schema:
        value_populated = collection_schema[key]["exists"]
        collection_schema[key]["exists"] = value_populated / total_documents * 100
        collection_schema[key]["null_percentage"] =\
            (100 - (collection_schema[key]["null_percentage"] / value_populated * 100))

    return collection_schema


# Function to print schema dictionary
def print_schema(schema_dict):
    for key, value in schema_dict.items():
        print(f"Field: {key}")
        print(f"  Types: {', '.join(value['types'])}")
        print(f"  Max Length: {value['max_length']}")
        print(f"  Exists on: {value['exists']:.2f}%")
        print(f"  Population: {value['null_percentage']:.2f}%")
        print(f"  Max Value: {value['max_value']}")
        print()


# Function to get schema of a MongoDB collection and return it as a dictionary
def get_nested_schema(collection, schema_dict):
    nested_schema = {}

    for key, value in schema_dict.items():
        if 'list' in value['types']:
            nested_schema[key] = {}

            total_documents = 0
            for document in collection.find():
                if key in document and isinstance(document[key], list):
                    total_documents += 1
                    for nested_doc in document[key]:
                        if isinstance(nested_doc, dict):
                            for nested_key, nested_value in nested_doc.items():
                                if nested_key not in nested_schema[key]:
                                    nested_schema[key][nested_key] = {
                                        "types": set(),
                                        "max_length": 0,
                                        "exists": 0,
                                        "null_percentage": 0,
                                        "max_value": None
                                    }

                                nested_schema[key][nested_key]["types"].add(type(nested_value).__name__)

                                if isinstance(nested_value, str):
                                    length = len(nested_value)
                                    if length > nested_schema[key][nested_key]["max_length"]:
                                        nested_schema[key][nested_key]["max_length"] = length
                                        nested_schema[key][nested_key]["max_value"] = nested_value

                                    nested_schema[key][nested_key]["exists"] += 1
                                elif isinstance(nested_value, (int, float)):
                                    nested_schema[key][nested_key]["exists"] += 1
                                elif nested_value is not None:
                                    nested_schema[key][nested_key]["exists"] += 1
                                elif nested_value is None:
                                    nested_schema[key][nested_key]["null_percentage"] += 1

            for nested_key, nested_value in nested_schema[key].items():
                value_populated = nested_value["exists"]
                nested_value["exists"] = value_populated / total_documents * 100
                if value_populated > 0:
                    nested_value["null_percentage"] = (
                        100 - (nested_value["null_percentage"] / value_populated * 100)
                    )
                else:
                    nested_value["null_percentage"] = 0

    return nested_schema


# Function to print the nested schema
def print_nested_schema(schema_dict):
    for key, value in schema_dict.items():
        print(f"Nested Field: {key}")
        for nested_key, nested_value in value.items():
            if nested_key != "types":
                print(f"    Attribute: {nested_key}")
                print(f"         Types: {', '.join(nested_value['types'])}")
                print(f"         Max Length: {nested_value['max_length']}")
                print(f"         Exists on: {nested_value['exists']:.2f}%")
                print(f"         Population: {nested_value['null_percentage']:.2f}%")
                print(f"         Max Value: {nested_value['max_value']}")
                print()

In [4]:
import os
from dotenv import load_dotenv


# Load environment variables
load_dotenv()

# MongoDB connection details
client = MongoClient(os.getenv("MongoClientURI"))
db = client["dwh_sources"]

# Build the schema
schema = get_schema(db["kaggle_linkedin_proxycurl_profiles"])

# Print the schema
print_schema(schema)

Field: _id
  Types: ObjectId
  Max Length: 0
  Exists on: 100.00%
  Population: 100.00%
  Max Value: None

Field: public_identifier
  Types: str
  Max Length: 379
  Exists on: 100.00%
  Population: 100.00%
  Max Value: %E2%96%88%E2%95%91%E2%96%8C%E2%94%82%E2%95%91%E2%96%8C%E2%95%91%E2%96%8C%E2%94%82%E2%96%88%E2%94%82%E2%96%8C%E2%95%91%E2%94%82%E2%96%88%E2%95%91-%E2%96%88%E2%95%91%E2%96%8C%E2%94%82%E2%95%91%E2%96%8C%E2%95%91%E2%96%8C%E2%94%82%E2%96%88%E2%94%82%E2%96%8C%E2%95%91%E2%94%82%E2%96%88%E2%95%91%E2%96%88%E2%95%91%E2%94%82%E2%96%8C%E2%95%91%E2%94%82%E2%96%88%E2%95%91%E2%96%8C-86240924

Field: profile_pic_url
  Types: str, NoneType
  Max Length: 963
  Exists on: 100.00%
  Population: 90.32%
  Max Value: https://s3.us-west-000.backblazeb2.com/proxycurl/person/%25E2%2596%2588%25E2%2595%2591%25E2%2596%258C%25E2%2594%2582%25E2%2595%2591%25E2%2596%258C%25E2%2595%2591%25E2%2596%258C%25E2%2594%2582%25E2%2596%2588%25E2%2594%2582%25E2%2596%258C%25E2%2595%2591%25E2%2594%2582%25E2%2596%2588

In [5]:
# Build the schema
schema_nested = get_nested_schema(db["kaggle_linkedin_proxycurl_profiles"], schema)

# Print the schema
print_nested_schema(schema_nested)

Nested Field: experiences
    Attribute: starts_at
         Types: NoneType, dict
         Max Length: 0
         Exists on: 339.57%
         Population: 95.53%
         Max Value: None

    Attribute: ends_at
         Types: NoneType, dict
         Max Length: 0
         Exists on: 257.84%
         Population: 62.42%
         Max Value: None

    Attribute: company
         Types: str, NoneType
         Max Length: 110
         Exists on: 354.43%
         Population: 99.91%
         Max Value: 
          ILP Overseas - 25+ Branches Pan India | IELTS Coaching | Study in Canada USA UK Australia  Germany


    Attribute: company_linkedin_profile_url
         Types: str, NoneType
         Max Length: 663
         Exists on: 259.77%
         Population: 63.44%
         Max Value: https://il.linkedin.com/company/%%D1%%81%%D0%%B0%%D0%%BD%%D0%%BA%%D1%%82-%%D0%%BF%%D0%%B5%%D1%%82%%D0%%B5%%D1%%80%%D0%%B1%%D1%%83%%D1%%80%%D0%%B3%%D1%%81%%D0%%BA%%D0%%B8%%D0%%B9-%%D0%%B3%%D0%%BE%%D1%%81%%D1%%83%%D

In [6]:
import os
from dotenv import load_dotenv


# Load environment variables
load_dotenv()

# MongoDB connection details
client = MongoClient(os.getenv("MongoClientURI"))
db = client["dwh_sources"]

# Build the schema
schema = get_schema(db["kaggle_linkedin_proxycurl_companies"])

# Print the schema
print_schema(schema)

Field: _id
  Types: ObjectId
  Max Length: 0
  Exists on: 100.00%
  Population: 100.00%
  Max Value: None

Field: linkedin_internal_id
  Types: str, NoneType
  Max Length: 8
  Exists on: 100.00%
  Population: 100.00%
  Max Value: 13203740

Field: description
  Types: str, NoneType
  Max Length: 3754
  Exists on: 100.00%
  Population: 98.06%
  Max Value: Today's consumers have choices. People are finding new ways to interact with money and the traditional banking system is not offering the innovation necessary to keep up. Since the introduction of cryptocurrency 10 years ago, it has gained rising acceptance globally. Studies show that there are 60 million unique users in 2017, who actively use a crypto wallet to send, receive and store currencies.

Crowdbank.io solves problems faced by traditional banking customers, while meeting the demands of cryptocurrency its clients. CrowdBank is a repository for the cryptocurrencies of the users, that is why it is a full service cryptobanking solu

In [7]:
# Build the schema
schema_nested = get_nested_schema(db["kaggle_linkedin_proxycurl_companies"], schema)

# Print the schema
print_nested_schema(schema_nested)

Nested Field: company_size
Nested Field: specialities
Nested Field: locations
    Attribute: country
         Types: str, NoneType
         Max Length: 24
         Exists on: 151.40%
         Population: 99.11%
         Max Value: Karlstad Innovation Park

    Attribute: city
         Types: str, NoneType
         Max Length: 89
         Exists on: 151.40%
         Population: 99.11%
         Max Value: Singapore | Hong Kong | Thailand | Malaysia | Indonesia | Philippines | Vietnam | Myanmar

    Attribute: postal_code
         Types: str, NoneType
         Max Length: 12
         Exists on: 100.36%
         Population: 47.81%
         Max Value: 216-662-9100

    Attribute: line_1
         Types: str, NoneType
         Max Length: 316
         Exists on: 143.83%
         Population: 93.80%
         Max Value: Core Central Region:Downtown Core (Marina Bay, Marina Centre, Raffles Place, Tanjong Pagar) Outram Sentosa Rochor Orchard Newton River Valley Bukit Timah Holland Road Tanglin Nov