# Zadanie rekrutacyjne
##### Plik należy pobrać z serwera, korzystam więc z komendy w terminalu bash 'sftp data-engineer@hiring.cloudtechnologies.dev' a następnie podaje hasło. Kolejnym krokiem jest pobranie pliku komendą 'get data.tsv.gz'. Na koniec zamykam połączenie komendą 'exit'.



### Na początek importuje potrzebne biblioteki

In [None]:

import pandas as pd
import os
import gzip
import shutil
import csv
import re
from datetime import datetime
import statistics
import numpy as np



### Rozpakowuje plik do formatu tsv

In [None]:
input_file = 'data.tsv.gz'
output_file = 'unziped_data.tsv'

# Rozpakowanie pliku
with gzip.open(input_file, 'rb') as f_in:
    with open(output_file, 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)

print(f"Plik rozpakowany do {output_file}")

### Sprawdzam maksymalną ilość kolumn w pliku

In [None]:
# Definiuje funkcję, sprawdzającą ilość kolumn
def get_max_columns(output_file):
    max_columns = 0
    with open(output_file, 'r', newline='', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter='\t')
        for line in reader:
            num_columns = len(line)
            if num_columns > max_columns:
                max_columns = num_columns
    return max_columns

# Zapisuje do zmiennej wartość z liczbą kolumn
max_columns = get_max_columns(output_file)

print(f"Maksymalna liczba kolumn w pliku: {max_columns}")


### Następnym krokiem będzie oczyszczenie danych
##### Nie mogę bezpośrednio wczytać danych do DataFrame w Pandas, ponieważ dane mają różne wymiary kolumn i typy. Dlatego wybieram opcję oczyszczenia pętlą standardowym Pythonem na początek.

In [None]:
# Ścieżki plików
input_file = 'unziped_data.tsv'
output_file = 'final_cleaned_data.tsv'

# Definiuję funkcję czyszczącą plik wraz z funkcjami warunkowymi
def is_valid_number(value, min_length=14, max_length=22):
    """Sprawdza, czy wartość jest liczbą o długości od min_length do max_length cyfr."""
    return re.match(f'^\d{{{min_length},{max_length}}}$', value) is not None

def is_valid_unix_timestamp(value):
    """Sprawdza, czy wartość jest poprawnym UNIX timestamp (13 cyfr)."""
    return re.match(r'^\d{13}$', value) is not None

def is_valid_url(value):
    """Sprawdza, czy wartość jest poprawnym adresem URL zaczynającym się od http:// lub https://."""
    return re.match(r'^https?://', value) is not None

def is_valid_slash(value):
    """Sprawdza, czy wartość zawiera znak / jako stały element podstron"""
    return '/' in value

def ensure_full_url(value):
    """Dodaje https://www. do adresu, jeśli nie zaczyna się od http:// lub https://"""
    if not is_valid_url(value):
        return f'https://www.{value}'
    return value

def clean_data(input_file, output_file):
    invalid_rows_count = 0
    with open(input_file, 'r', newline='', encoding='utf-8') as infile, \
         open(output_file, 'w', newline='', encoding='utf-8') as outfile:
        
        reader = csv.reader(infile, delimiter='\t')
        writer = csv.writer(outfile, delimiter='\t')

        for line in reader:
            # Maksymalna ilosć kolumn to 4, więc najpierw dla nich wykonuję operacje
            if len(line) == 4:
                col1, col2, col3, col4 = line

                # Krok 1: Sprawdzanie poprawności ID i poprawne umiejscowienie
                if not is_valid_number(col1):
                    if is_valid_number(col2):
                        col1, col2 = col2, col1
                    elif is_valid_number(col3):
                        col1, col3 = col3, col1
                    elif is_valid_number(col4):
                        col1, col4 = col4, col1
                    else:
                        continue    
                
                # 2: Sprawdzanie i poprawne umiejscowienie Unix Timestamp
                if not is_valid_unix_timestamp(col3):
                    if is_valid_unix_timestamp(col2):
                        col3, col2 = col2, col3
                    elif is_valid_unix_timestamp(col4):
                        col3, col4 = col4, col3
                    else:
                        continue    
                
                # 3: Sprawdzanie i poprawianie adresu URL
                if not is_valid_url(col2):
                    if is_valid_url(col4):
                        col2, col4 = col4, col2
                        if is_valid_url(col2):
                            if is_valid_slash(col4):
                                col2 = f'{col2}{col4}'
                        else:
                            col2 = ensure_full_url(col2)
                            if is_valid_slash(col4):
                                col2 = f'{col2}{col4}'
                if is_valid_url(col2):
                    if is_valid_slash(col4):
                            col2 = f'{col2}{col4}'   

                # 4: Czyszczenie kolumny 4 i zapis rekordów
                col4 = ''
                writer.writerow([col1, col2, col3])
                continue

            # Docelowa ilość kolumn to 3, gdy rekord ma mniejszą ilość to dane są wybrakowane, więc zakładam pozbycie się ich  
            elif len(line) == 3:
                col1, col2, col3 = line
                # 1: Czyszczenie kolumny 4 (profilaktycznie)
                col4 = ''
                
                # 2: Sprawdzanie poprawności ID i poprawne umiejscowienie
                if not is_valid_number(col1):
                    if is_valid_number(col2):
                        col1, col2 = col2, col1
                    elif is_valid_number(col3):
                        col1, col3 = col3, col1
                    else:
                        continue    
                
                # 3: Sprawdzanie i poprawne umiejscowienie Unix Timestamp
                if not is_valid_unix_timestamp(col3):
                    if is_valid_unix_timestamp(col2):
                        col3, col2 = col2, col3
                    else:
                        continue    
                
                # 4: Sprawdzanie i poprawianie adresu URL
                if not is_valid_url(col2):
                    if is_valid_url(col3):
                        col2, col3 = col3, col2
                    else:
                        col2 = ensure_full_url(col2)
                        if is_valid_slash(col3):
                            col2 = f'{col2}{col3}'
                        elif is_valid_slash(col1):
                            col2 = f'{col2}{col1}'   

                # 5 Zapisanie poprawionych danych w kolumnach
                writer.writerow([col1, col2, col3])
                continue  

            else:
                invalid_rows_count += 1
                continue  

    print(f"Dane zostały oczyszczone i zapisane do {output_file}")
    print(f"Liczba wierszy z mniej niż 3 kolumnami: {invalid_rows_count}")

# Wywołanie funkcji
clean_data(input_file, output_file)

### Weryfikacja danych po 1 etapie czyszczenia

In [None]:
# Definiuje funkcję liczącą ilość wierszy i porównującą między plikami

def count_rows(file_path):
    """Zlicza liczbę wierszy w pliku."""
    row_count = 0
    with open(file_path, 'r', newline='', encoding='utf-8') as infile:
        reader = csv.reader(infile, delimiter='\t')
        for _ in reader:
            row_count += 1
    return row_count

def compare_row_counts(file1, file2):
    """Porównuje liczbę wierszy w dwóch plikach i zwraca różnicę."""
    count1 = count_rows(file1)
    count2 = count_rows(file2)
    difference = abs(count1 - count2)
    return count1, count2, difference

# Pliki do porównania
file1 = 'unziped_data.tsv'
file2 = 'final_cleaned_data.tsv'

# Porównanie liczby wierszy i obliczenie różnicy
count1, count2, difference = compare_row_counts(file1, file2)

print(f"Liczba wierszy w pliku {file1}: {count1}")
print(f"Liczba wierszy w pliku {file2}: {count2}")
print(f"Ilość zredukowanych błędnych rekordów: {difference}")

### Wczytanie danych do DataFrame 
##### Dane są już w docelowym wymiarze, więc dalsze operacje będę wykonywał przy użyciu Pandas.
##### Danych jest bardzo dużo i takie operacje efektywniej o wiele byłoby wykonać np. na clustrze Sparka. Dalsze działania wykonam na pierwszych 100 000 wierszach, aby pokazać efekty pracy i wykonane zadania, ale można też przy posiadaniu dużej mocy obliczeniowej pominąć ten warunek lub zwiększyć ilość rekordów.

In [None]:
# Wczytanie danych do DataFrame i dodanie nazw kolumn

df = pd.read_csv('final_cleaned_data.tsv', sep='\t', nrows=100000, header = None)

df.columns = ['ID użytkownika', 'Adres URL', 'Unix timestamp']
df.head(20)

In [None]:
# Sprawdzam czy finalny wymiar DataFrame jest poprawny
df.shape

In [None]:
# Sprawdzam typ danych
df.dtypes

### Finalne czyszczenie danych i przygotowanie ich do analizy
##### Na potrzebę wykonania zadania, rozdzielam domeny od podstron i usuwam protokół HTTP oraz oczyszczam z błędów


In [None]:
# Definiuje funkcję usuwającą protokół HTTP
def split_url(url):
    # Regex do wyodrębnienia głównej domeny od podstrony
    match = re.match(r'https?://([^/?#]+)([^?#]*)', url)
    if match:
        domain = match.group(1)
        path = match.group(2)
        return domain, path
    else:
        return url, '' 

# Zastosowanie funkcji do kolumny 'Adres URL'
df[['Domena', 'Podstrona']] = df['Adres URL'].apply(split_url).apply(pd.Series)

# Usunięcie oryginalnej kolumny 'Adres URL'
df = df.drop(columns=['Adres URL'])

# Przearanżowanie kolumn
df = df[['Unix timestamp', 'ID użytkownika', 'Domena', 'Podstrona']]

# Usuwam protokół HTTP
df['Domena'] = df['Domena'].str.replace(r'^https?://', '', regex=True)

# Czyszczę błędne wartości w kolumnie 'Podstrona'
df['Podstrona'] = df['Podstrona'].apply(lambda x: '' if x.strip() == '/' else x)

# Sortowanie danych
df = df.sort_values(by=['ID użytkownika', 'Unix timestamp', 'Domena', 'Podstrona'])


In [None]:
# Weryfikacja typu danych
df.dtypes

In [None]:
# Wyświetlenie pierwszych 10 wierszy
df.head(10)

### Zadanie 1. Liczba unikalnych użytkowników, którzy odwiedzili każdą z domen

In [None]:

task1_df = df.copy()
# Grupowanie po domenie i liczenie unikalnych ID
unique_users = task1_df.groupby('Domena')['ID użytkownika'].nunique().reset_index()

# Zmiana nazw kolumn
unique_users = unique_users.rename(columns={'ID użytkownika': 'Liczba unikalnych użytkowników'})


# Zapisanie wyników do pliku TSV
unique_users.to_csv('unique_users_per_domain.tsv', sep='\t', index=False)

### Zadanie 2. Liczba podstron odwiedzanych przez typowego użytkownika na każdej domenie

In [None]:
task2_df = df.copy()

# Zamiana pustych wartości na 0
task2_df['Podstrona'] = task2_df['Podstrona'].apply(lambda x: 1 if x.strip() != '' else 0)

# Zliczanie liczby podstron
user_page_count = task2_df.groupby(['Domena', 'ID użytkownika'])['Podstrona'].sum().reset_index()

# średni liczba podstron na użytkownika dla każdej domeny
average_pages_per_domain = user_page_count.groupby('Domena')['Podstrona'].mean().reset_index()

# Zmiana nazwy kolumn
average_pages_per_domain = average_pages_per_domain.rename(columns={'Podstrona': 'Średnia liczba podstron na użytkownika'})

# Dodanie kolumny z zaokrągloną liczbą podstron
average_pages_per_domain['Zaokrąglona liczba podstron'] = average_pages_per_domain['Średnia liczba podstron na użytkownika'].round()

# Zapisanie wyników do pliku TSV
average_pages_per_domain.to_csv('average_pages_per_domain.tsv', sep='\t', index=False)

### Zadanie 3. Bounce Rate-procent użytkowników, którzy odwiedzają daną stronę i nie wykonują na niej żadnej akcji

In [None]:
task3_df = df.copy()

# Sortowanie danych
task3_df = task3_df.sort_values(by=['ID użytkownika', 'Unix timestamp', 'Domena', 'Podstrona'])

# Definiuje funkcję do identyfikacji użytkowników nie wykonujących żadnych akcji na podstronach
def get_bounce_users(group):
    unique_users = group['ID użytkownika'].unique()
    bounce_users = set()
    for user in unique_users:
        user_data = group[group['ID użytkownika'] == user]
        if user_data['Podstrona'].eq('').all():
            bounce_users.add(user)
    return bounce_users

# Grupowuje po domenie
grouped = task3_df.groupby('Domena')

# Obliczam liczbę unikalnych użytkowników i użytkowników niewykonujących akcji
bounce_data = []
for domain, group in grouped:
    bounce_users = get_bounce_users(group)
    total_users = group['ID użytkownika'].nunique()
    bounce_users_count = len(bounce_users)
    bounce_data.append({
        'Domena': domain,
        'Liczba unikalnych użytkowników': total_users,
        'Użytkownicy tylko odwiedzający stronę': bounce_users_count
    })

bounce_rate_df = pd.DataFrame(bounce_data)

# Obliczam wskaźnik Bounce Rate
bounce_rate_df['(%) Użytkowników nie wykonujących żadnej akcji na stronie'] = (
    bounce_rate_df['Użytkownicy tylko odwiedzający stronę'] / 
    bounce_rate_df['Liczba unikalnych użytkowników']
) * 100

# Zaokrąglenie do 2 miejsc po przecinku
bounce_rate_df['(%) Użytkowników nie wykonujących żadnej akcji na stronie'] = (
    bounce_rate_df['(%) Użytkowników nie wykonujących żadnej akcji na stronie'].round(2)
)

# Dodawanie %
bounce_rate_df['(%) Użytkowników nie wykonujących żadnej akcji na stronie'] = (
    bounce_rate_df['(%) Użytkowników nie wykonujących żadnej akcji na stronie'].astype(str) + '%'
)

# Zapisanie wyniku do pliku TSV
bounce_rate_df.to_csv('bounce_rate_per_domain2.tsv', sep='\t', index=False)

### Zadanie 4. Średni czas spędzany na domenie przez użytkownika

In [None]:
task4_df = df.copy()

# Sortowanie danych
task4_df = task4_df.sort_values(by=['ID użytkownika', 'Unix timestamp', 'Domena', 'Podstrona'])

# Definiuje funkcję wykrywającą użytkowników, wykonujących akcję na stronie
def get_bounce_users(group):
    unique_users = group['ID użytkownika'].unique()
    no_bounce_users = set()
    
    for user in unique_users:
        user_data = group[group['ID użytkownika'] == user]
        if not user_data['Podstrona'].eq('').all():
            no_bounce_users.add(user)
    
    return no_bounce_users

# Obliczam wskaźnik użytkowników wykonujących akcję na stronie
def get_bounce_rate(df):
    bounce_rate_data = []
    for domain, group in df.groupby('Domena'):
        no_bounce_users = get_bounce_users(group)
        bounce_rate_data.append({
            'Domena': domain,
            'Liczba unikalnych użytkowników': group['ID użytkownika'].nunique(),
            'Użytkownicy wykonujący akcję na stronie': len(no_bounce_users)
        })
    return pd.DataFrame(bounce_rate_data)

no_bounce_rate_df = get_bounce_rate(task4_df)

# Definiuje funkcję liczącą czas spędzony na stronie
def calculate_time_spent(group):
    user_times = []
    for user_id, user_data in group.groupby('ID użytkownika'):
        if len(user_data) > 0:
            start_time = user_data['Unix timestamp'].min()
            end_time = user_data['Unix timestamp'].max()
            time_spent = end_time - start_time
            user_times.append(time_spent)
    return pd.Series({
        'Czas spędzony (ms)': sum(user_times)
    })

# Obliczam czas spędzony na stronie
def get_time_spent(df):
    time_spent_data = []
    for domain, group in df.groupby('Domena'):
        time_spent_data.append({
            'Domena': domain,
            **calculate_time_spent(group)
        })
    return pd.DataFrame(time_spent_data)

time_spent_df = get_time_spent(task4_df)

# Łączę oba DataFrame w jeden
final_df = pd.merge(no_bounce_rate_df, time_spent_df, on='Domena')

# Obliczam średni czas spędzany na stronie (w milisekundach)
final_df['Średni czas spędzany (ms)'] = final_df['Czas spędzony (ms)'] / final_df['Użytkownicy wykonujący akcję na stronie']

# Zapisuje wynik do pliku TSV
final_df.to_csv('average_time_spent_per_domain.tsv', sep='\t', index=False)