# ETL Pipeline for Pterodactyl Minecraft Servers Analysis

## Index

- Install requierements
- Import libraries and setup key variables
- Extract data from source
- Transformation from data to information
- Load processed data into Data Warehouse (Postgres)

## Install requierements

In [None]:
!pip install -r requirements.txt

## Import libraries and setup key variables

Remember to add you own credentials in the .env file for them to be loaded here

In [None]:
import urllib.request, psycopg2, pyarrow, json, csv, gzip, re, os
from sqlalchemy import create_engine, text
from pydactyl import PterodactylClient
from dotenv import load_dotenv
import pandas as pd

# Load .env file credentials
load_dotenv()

# Database connection
host = os.getenv('POSTGRES_HOST')
port = os.getenv('POSTGRES_PORT')
dbname = os.getenv('POSTGRES_DBNAME')
user = os.getenv('POSTGRES_USER')
password = os.getenv('POSTGRES_PASSWORD')
connection = f'postgresql://{user}:{password}@{host}:{port}/{dbname}'

# Pterodactyl connection
pterodactyl_url = os.getenv('PTERODACTYL_URL')
client_api_key = os.getenv('PTERODACTYL_API_KEY')

# Connecto to Pterodactyl API and get list of servers
api = PterodactylClient(pterodactyl_url, client_api_key, debug=False)
servers_object = api.client.servers.list_servers(includes=['egg'])
list_servers = [[server['attributes'] for server in servers] for servers in servers_object][0]

## Extract data from source

In [None]:
keys = ['identifier', 'name', 'node']
limits_keys = ['memory', 'disk']
log_pattern = r'^\d{4}-\d{2}-\d{2}-\d.*' # yyyy-mm-dd-n.log.gz
extension_file_compressed = '.log.gz'
extension_file_uncompessed = '.log'
pwd = os.getcwd() #os.path.dirname(os.path.realpath(__file__)) this is used for .py files
raw_data_folder = 'raw_data'
output_folder = os.path.join(pwd, 'output')

eggs_ready = ['Vanilla Minecraft', 'Forge Minecraft', 'Paper'] # Vanilla Bedrock is still not ready to be processed

for server in list_servers:
    # Get server information
    server_data = {key: server[key] for key in keys}
    server_resources = {key: server['limits'][key] for key in limits_keys}
    server_egg = {'egg': server['relationships']['egg']['attributes']['name']}
    server_docker = {'docker_image': server['docker_image']}
    server_info = {**server_data, **server_egg, **server_docker, **server_resources}

    if server_info['egg'] in eggs_ready:
        # Create a folder
        folder_name = server_info['identifier']
        path_folder = os.path.join(pwd, raw_data_folder, folder_name)
        if not os.path.exists(path_folder):
            os.makedirs(path_folder)

        # Download Metadata
        with open(os.path.join(path_folder, 'metadata.json'), "w") as json_file:
            json.dump([server_info], json_file, indent=4)

        # Download users in cache
        fieldnames = ['name']
        users_cache = api.client.servers.files.get_file_contents(server_info['identifier'], 'usercache.json')
        user_names = [user['name'] for user in users_cache]

        # Add new users only
        file_exists = os.path.exists(os.path.join(path_folder, 'users.csv'))
        existing_names = []
        with open(os.path.join(path_folder, 'users.csv'), 'a+', newline='') as csvfile:
            csvfile.seek(0) 
            reader = csv.DictReader(csvfile)
            existing_names = [row['name'] for row in reader]
            new_names = [name for name in user_names if name not in existing_names]

            if not file_exists:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()

            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            for name in new_names:
                writer.writerow({'name': name})

        # Download new log files
        local_logs = [filename for filename in os.listdir(path_folder) if re.match(log_pattern, filename)]
        log_files = api.client.servers.files.list_files(server_info['identifier'], '/logs/')
        list_logs = [file['attributes']['name'] for file in log_files['data'] if re.match(log_pattern, file['attributes']['name']) and file['attributes']['name'][0:-(len(extension_file_compressed)-len(extension_file_uncompessed))] not in local_logs]
        list_download = [api.client.servers.files.download_file(server_info['identifier'], f'/logs/{log}') for log in list_logs]
        [urllib.request.urlretrieve(list_download[i], os.path.join(path_folder,list_logs[i])) for i in range(len(list_logs))]
        print(f'Files downloaded: {len(list_download)}')

        # Uncompressing files
        for filename in os.listdir(path_folder):
            if filename.endswith(extension_file_compressed):
                compressed_file_path = os.path.join(path_folder,filename)
                decompressed_file_path = os.path.splitext(compressed_file_path)[0] # Remove the .gz extension

                # Uncompress the file
                with gzip.open(compressed_file_path, 'rb') as compressed_file:
                    with open(decompressed_file_path, 'wb') as decompressed_file:
                        decompressed_file.write(compressed_file.read())

                # Delete the compressed file
                os.remove(compressed_file_path)

## Transformation from data to information

In [None]:
df_all_logs = pd.DataFrame(columns=['server_id', 'date', 'time', 'information', 'user', 'activity'])
df_all_server_info = pd.DataFrame(columns=['identifier', 'name', 'node', 'memory', 'disk'])

for folder in os.listdir(os.path.join(pwd, raw_data_folder)): 
    path_folder = os.path.join(pwd, raw_data_folder, folder)

    # Read all logs as one
    log_files = [log for log in os.listdir(path_folder) if log.endswith(extension_file_uncompessed)]
    log_files.sort()

    all_logs = ""
    for log_file in log_files:
        with open(os.path.join(path_folder, log_file), 'r') as file:
            log_contents = file.read().split('\n')
            log_contents = "\n".join([f'[{log_file[:-(2+len(extension_file_uncompessed))]}] ' + line for line in log_contents if line.strip() != ""])
            all_logs += log_contents + "\n"

    # Transform information it in meaningful information
    pattern = r'\[(\d{4}-\d{2}-\d{2})\] \[.*?(\d{2}:\d{2}:\d{2}).*?\] \[(.*?)/.*?\]: (.*?)\n'
    matches = re.findall(pattern, all_logs)
    
    # Create a list of dictionaries to store the extracted data
    log_data = [{'server_id': folder, 'date': match[0], 'time': match[1], 'category': match[2], 'information': match[3]} for match in matches]
    # Create a dataframe of the logs
    df_logs = pd.DataFrame(log_data)
    # Filter by column category selecting only Server thread logs nad delete that column
    df_logs = df_logs[df_logs['category'] == 'Server thread'][['server_id', 'date', 'time', 'information']]

    # Create a dataframe of the users
    df_users = pd.read_csv(os.path.join(path_folder,'users.csv'))

    # Add information when server started
    index=df_logs[df_logs['information'].str.startswith("Starting minecraft server version")].index
    df_logs.loc[index, 'user'] = 'server'
    df_logs.loc[index, 'activity'] = 'start'
    # Add information when server stopped
    index=df_logs[df_logs['information'].str.startswith("Stopping server")].index
    df_logs.loc[index, 'user'] = 'server'
    df_logs.loc[index, 'activity'] = 'stop'
    # Add information when user did something
    for user in df_users['name']:
        # Add information when user activity
        index=df_logs[df_logs['information'].str.startswith(user)].index
        df_logs.loc[index, 'user'] = user
        df_logs.loc[index, 'activity'] = 'action'
        # Add information when user login
        index=df_logs[df_logs['information'].str.startswith(f'{user} joined the game')].index
        df_logs.loc[index, 'user'] = user
        df_logs.loc[index, 'activity'] = 'login'
        # Add information when user logout
        index=df_logs[df_logs['information'].str.startswith(f'{user} left the game')].index
        df_logs.loc[index, 'user'] = user
        df_logs.loc[index, 'activity'] = 'logout'
        # Delete duplicated activity of login and logout
        index=df_logs[df_logs['information'].str.startswith(f'{user}[') | df_logs['information'].str.startswith(f'{user} lost connection: Disconnected')].index
        df_logs.loc[index, 'user'] = None
        df_logs.loc[index, 'activity'] = None
    # Rows with no server/user activity is deleted
    df_logs = df_logs.dropna(subset=['activity'])

    # Read JSON file with metadata of the server
    df_server_info = pd.read_json(os.path.join(path_folder, 'metadata.json'), orient='records')

    # Save every log in df_all_logs for each iteration
    df_all_logs = pd.concat([df_all_logs, df_logs], ignore_index=True)
    df_all_server_info = pd.concat([df_all_server_info, df_server_info], ignore_index=True)

# Export it in parquet file
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

df_all_logs.to_parquet(os.path.join(output_folder, 'fact_logs.parquet'), index=False)
df_all_server_info.to_parquet(os.path.join(output_folder, 'dim_server.parquet'), index=False)

## Load processed data into Data Warehouse (Postgres)

In [None]:
# Manually load only fact_logs into the emtpy table

pwd = os.getcwd() #os.path.dirname(os.path.realpath(__file__))
output_folder = os.path.join(pwd, 'output')

df=pd.read_parquet(os.path.join(output_folder, 'fact_logs.parquet'))
engine = create_engine(connection)
with engine.connect() as conn:
    df.to_sql(name = 'activity', schema = 'pterodactyl', con = conn, if_exists='append', index=False)

In [None]:
# Manually load only dim_server into the emtpy table

pwd = os.getcwd() #os.path.dirname(os.path.realpath(__file__))
output_folder = os.path.join(pwd, 'output')

df=pd.read_parquet(os.path.join(output_folder, 'dim_server.parquet'))
engine = create_engine(connection)
with engine.connect() as conn:
    df.to_sql(name = 'servers', schema = 'pterodactyl', con = conn, if_exists='append', index=False)

In [None]:
# THIS IS ONLY A TEMPLATE FROM ANOTHER PROJECT, PLEASE MODIFY AS NEEDED

#for parquet in os.listdir(output_folder):

engine = create_engine(connection)

# Start connection with database
with engine.connect() as conn:
    # Start a new transaction
    trans = conn.begin()

    try:
        # Load ID from database
        print('  - DataBase Loading')
        result = conn.execute(text(f'SELECT "{ID}" FROM {TABLE}'))
        dataBase = pd.DataFrame(result.fetchall(), columns=result.keys())

        # Compare ID
        print('  - Data Comparison')
        sameID = dataBase[ID].isin(dataframe[ID])

        toUpdate = dataframe[dataframe[ID].isin(dataBase[ID][sameID])]
        toIngest = dataframe[~dataframe[ID].isin(dataBase[ID][sameID])]

        # Insert the DataFrame into a table
        print('  - Insert new data to Table')
        toIngest.to_sql(TABLE, conn, if_exists='append', index=False)
        print(f'    - Rows inserted: {toIngest.shape[0]}')

        # Insert the updatable DataFrame into the TABLE_UPDATE table
        print('  - Insert updated data to Auxiliary Table')
        toUpdate.to_sql(TABLE_UPDATE, conn, if_exists='append', index=False)
        print(f'    - Rows inserted: {toUpdate.shape[0]}')

        # Define and execute the following queries
        print('  - Updating data from Auxiliry Table to Table')
        print('    - Delete rows to be updated in Table')
        conn.execute(text(f'DELETE FROM {TABLE} WHERE "{ID}" IN (SELECT "{ID}" FROM {TABLE_UPDATE});'))
        print('    - Insert updated rows from Auxiliary Table to Table')
        conn.execute(text(f'INSERT INTO {TABLE} SELECT * FROM {TABLE_UPDATE};'))
        print('    - Truncate Auxiliary Table')
        conn.execute(text(f'TRUNCATE TABLE {TABLE_UPDATE};'))

        # Update date from the last_update table based on max date on the file
        print('  - Update date from last_update table')
        old_last_update = conn.execute(text(f'SELECT "{TABLE}" FROM last_update')).fetchall()[0][0]
        new_last_update = date_file
        if new_last_update > old_last_update:
            conn.execute(text(f"UPDATE last_update SET {TABLE} = '{new_last_update}' WHERE {TABLE} = '{old_last_update}';"))

        # Commit the transaction
        trans.commit()
        print('- Transaction commited')
        print('- Disconnecting from the database\n')

    except Exception as e:
        # Rollback the transaction on exception
        print('!!! [ERROR IN DATABASE QUERIES] !!!')
        trans.rollback()
        print('Transaction has been rolled back')
        print(f'Error occurred during transaction:\n{e}')
        raise
