# Requirements

# Imports

In [19]:
import internetarchive
import os
import tarfile
import csv
from datetime import datetime
from tqdm.notebook import tqdm
import pandas as pd
import configparser
import mysql.connector
import json

#Hadoop
from hdfs import InsecureClient


In [20]:
import warnings

# Suppressing the warnings
warnings.filterwarnings('ignore') 

# Functions

## General

In [21]:
def check_folder(name_folder):            
    #Creating folder if that doesn't exist
    p = %pwd
    p = p + f'/{name_folder}'
    path = os.path.expanduser(p)
    if not os.path.exists(path):
        os.makedirs(path)
        print("{} created.".format(path))
        
    return path

In [22]:
def get_file_name(item_name, ext='*tar'):
    
    file_names = [f.name for f in internetarchive.get_files(item_name, glob_pattern= ext)]
    
    return file_names

In [23]:
def humanize(size_bytes):
    KB = 1 << 10
    MB = 1 << 20
    GB = 1 << 30

    if size_bytes < KB:
        return '{} B'.format(size_bytes)
    elif size_bytes < MB:
        return '{:.2f} KiB'.format(size_bytes/KB)
    elif size_bytes < GB:
        return '{:.2f} MiB'.format(size_bytes/MB)
    else:
        return '{:.2f} GiB'.format(size_bytes/GB)


## Cleaning and Transfomations

In [24]:
def get_data(file_path, file_name, word_bag = False, lang = False, db_update = False):
    
    #reading json
    df = pd.read_json(file_path, lines=True, compression='gzip')
    
    #updating total tweets
    if not update_db == False:
        update_db(table = 'control', field = f"count_total = '{len(df)}'", condition = f"name = '{file_name}'")
    
    #Filtering language if a language was sent
    if not lang == False:
        df = df[df.lang == lang]
    
    #taking away columns unnecessary
    df = df.loc[:, ['created_at', 'text', 'entities']]
    
    #filtering Tweets using subject word bag if a word bag was sent
    if not word_bag == False:
        bow = '|'.join(word_bag) # bag of word
        df = df[df['text'].str.contains(bow, case=False)]
        
        #updating filtered tweets
        if not update_db == False:
            update_db(table = 'control', field = f"count_filtered = '{len(df)}'", condition = f"name = '{file_name}'")
    
    return df

## Control Operations

### CSV

In [25]:
def control_op_csv(control_file):

    #if file doesn't exist I'll create it
    if not os.path.exists(control_file):
        with open(control_file, 'w', newline='') as control_csv:
            writer = csv.DictWriter(control_csv, fieldnames=['name','datetime'])
            writer.writeheader()
        control_csv.close()
        print("{} created.".format(control_file))
            
    #Reading the control file
    with open(control_file, 'r', newline='') as control_csv:
        reader = csv.DictReader(control_csv)
        reader_data = [r for r in reader]
        control_csv.seek(0)
    control_csv.close()
    
    return reader_data

In [26]:
def download_file_csv(item_name, file_name, download_folder = 'Downloads'):
    
    path = check_folder(download_folder)
    
    #calling function for getting control file
    control_path = path + '/control_download.csv'    
    reader_data = control_op_csv(control_path)
    
    #Getting Itens names
    item = internetarchive.get_item(item_name)
        
    #openning control file to add file names on the list
    with open(control_path, 'a', newline='') as control_download:
        writer = csv.DictWriter(control_download, fieldnames=['name','datetime'])
        
        #check if the file has been downloaded before
        if not any(row['name'] == file_name for row in reader_data):
            
            #downloading
            r = item.download(
                destdir=path,  # The directory to download files to
                ignore_existing=True,  # Skip files that already exist locally
                checksum=True,  # Skip files based on checksum
                verbose=True,  # Print progress to stdout
                retries=100,  # The number of times to retry on failed requests
                no_directory=True,  # Download withtout the identifier
                files = file_name)
            
            #Adding file name on control list
            row = {'name' : file_name, 'datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
            writer.writerow(row)
            
    control_download.close()
    
    path_file = path + f'/{file_name}'
    
    return path_file

In [27]:
def tar_file_csv(tar_file, file_name, extration_folder='Extraction'):

    path = check_folder(extration_folder)
    
    #calling function for getting control file
    control_path = path + '/control_extraction.csv'    
    reader_data = control_op(control_path)

    #openning control file to add file names on the list
    with open(control_path, 'a', newline='') as control_tar:
        writer = csv.DictWriter(control_tar, fieldnames=['name','datetime'])
        
        #check if the file has been extrated before
        if not any(row['name'] == file_name for row in reader_data):

            #Extracting file
            tar_file.extract(file_name, path=path)

            #Saving files name on control file
            row = {'name' : file_name, 'datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
            writer.writerow(row)
            
            path_file = path + f'/{file_name}'
            
        else:
            
            path_file = False
    
    control_tar.close()
    
    
    return path_file

### Mysql

#### Basic operations

In [28]:
def open_connection():

    #getting configuration
    config = configparser.ConfigParser()
    config.read('config.ini')

    # Connect to Mysql
    conn = mysql.connector.connect(
        host = config['mysql']['host'],
        user = config['mysql']['user'],
        password = config['mysql']['password'],
        database = config['mysql']['database']
    )
        
    return conn 

In [29]:
def update_db(table, field, condition):
    
    conn = open_connection()
    cursor = conn.cursor()

    cursor.execute(f"UPDATE {table} SET {field} WHERE {condition};")

    conn.commit()
    cursor.close()
    conn.close()

In [30]:
def query_db(query):
    
    conn = open_connection()
    cursor = conn.cursor()
    
    cursor.execute(query)

    conn.commit()
    cursor.close()
    conn.close()

In [31]:
def read_db(query):
    conn = open_connection()
    
    df = pd.read_sql_query(query, conn)
    
    conn.close()
    return df

#### Specific operations

In [32]:
def check_control_db(file_type, file_name = False):
    
    if file_name == False:
        
        check = read_db(f"SELECT name FROM control WHERE type ='{file_type}'").to_dict(orient='records')
    
    else:
        
        check = read_db(f"SELECT name FROM control WHERE type ='{file_type}' AND name = '{file_name}'"
                       ).to_dict(orient='records')
        
        if len(check) > 0:
            check = True
        else:
            check = False

    return check

In [33]:
def download_file_db(item_name, file_name, download_folder = 'Downloads', type_control = 'download'):
    
    path = check_folder(download_folder)
    
    #query to get information from Mysql about control
    check = check_control_db(file_type = type_control, file_name = file_name)
                             
    #check if the file has been unzipped before
    if check == False:
    
        #getting Itens names
        item = internetarchive.get_item(item_name)

        #downloading
        r = item.download(
            destdir=path,  # The directory to download files to
            ignore_existing=True,  # Skip files that already exist locally
            checksum=True,  # Skip files based on checksum
            verbose=False,  # Print progress to stdout
            retries=100,  # The number of times to retry on failed requests
            no_directory=True,  # Download withtout the identifier
            files = file_name)
        
        #getting Metadata
        metadata = list(filter(lambda p: p["name"] == file_name, item.item_metadata['files']))

        #Adding file name on control list
        dt = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        sz = metadata[0]['size']
        query_db(f"INSERT INTO control (name, datetime, type, size) VALUES ('{file_name}','{dt}','{type_control}','{sz}')")
    
        path_file = path + f'/{file_name}'
        
    else:
        
        path_file = False   
    
    return path_file

In [34]:
def tar_file_db(tar_file, file_name, extration_folder='Extraction', type_control = 'extraction'):
    
    path = check_folder(extration_folder)
    
    #query to get information from Mysql about control
    check = check_control_db(file_type = type_control, file_name = file_name)
    
    
    #check if the file has been unzipped before
    if check == False:

        #Extracting file
        tar_file.extract(file_name, path=path)

        #Adding file name on control list
        dt = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        sz = tar_file.getmember(name=file_name).size
        query_db(f"INSERT INTO control (name, datetime, type, size) VALUES ('{file_name}', '{dt}', '{type_control}', '{sz}')")
        
        path_file = path + f'/{file_name}'
    
    else: #case the file has been unzipped
            
        path_file = False
        
    
    return path_file

# Checking Data

## Getting info

In [42]:
df = read_db("SELECT * FROM control WHERE type = 'extraction'")
down_df = check_control_db(file_type = 'download')

In [43]:
df['data'] = df['name'].str[:8].apply(lambda x: datetime.strptime(x, '%Y%m%d').strftime('%Y-%m-%d'))
df['data'] = pd.to_datetime(df['data'])
df['size'] = df['size'].astype(float)
df['count_total'] = df['count_total'].astype(float)
df['count_filtered'] = df['count_filtered'].astype(float)

In [68]:
df

Unnamed: 0,idnew_table,name,datetime,type,size,count_total,count_filtered,data
0,660,20221001/20221001235900.json.gz,2023-04-30 02:06:52,extraction,1734723.0,2558.0,0.0,2022-10-01
1,661,20221001/20221001235500.json.gz,2023-04-30 02:06:53,extraction,1740434.0,2558.0,0.0,2022-10-01
2,662,20221001/20221001235300.json.gz,2023-04-30 02:06:54,extraction,1738464.0,2539.0,0.0,2022-10-01
3,663,20221001/20221001235600.json.gz,2023-04-30 02:06:54,extraction,1844137.0,2753.0,0.0,2022-10-01
4,664,20221001/20221001235700.json.gz,2023-04-30 02:06:55,extraction,1720647.0,2565.0,1.0,2022-10-01
...,...,...,...,...,...,...,...,...
514978,516060,20220131/20220131000200.json.gz,2023-05-07 19:42:34,extraction,1651805.0,2453.0,3.0,2022-01-31
514979,516061,20220131/20220131001500.json.gz,2023-05-07 19:42:35,extraction,1511317.0,2317.0,4.0,2022-01-31
514980,516062,20220131/20220131000300.json.gz,2023-05-07 19:42:36,extraction,1632604.0,2432.0,3.0,2022-01-31
514981,516063,20220131/20220131000000.json.gz,2023-05-07 19:42:37,extraction,1663402.0,2568.0,2.0,2022-01-31


In [62]:
grouped_df = df.groupby(pd.Grouper(key='data', freq='M'))['datetime'].agg(['min', 'max'])


grouped_df['diff_days'] = (grouped_df['max'] - grouped_df['min']).dt.days


print(f"\n Mean:{grouped_df['diff_days'].mean()} days per month")


 Mean:2.75 days per month


In [74]:
grouped_df = df.groupby(pd.Grouper(key='data', freq='D'))['datetime'].agg(['min', 'max'])


grouped_df['diff_min'] = (grouped_df['max'] - grouped_df['min']).dt.seconds // 60

days_total = (grouped_df['max'].max() - grouped_df['min'].min())

print(f"\n Mean: {grouped_df['diff_min'].mean()} minutes per day")
print(f"\n Max: {grouped_df['diff_min'].max()} minutes per day")
print(f"\n Min: {grouped_df['diff_min'].min()} minutes per day")

print(f"\n Diff Total: {days_total}")


 Mean: 47.02739726027397 minutes per day

 Max: 1244 minutes per day

 Min: 14 minutes per day

 Diff Total: 7 days 23:07:39


In [52]:
print(f"{df['count_total'].mean():,.0f} tweets per file")

min_day = (df.groupby(pd.Grouper(key='data', freq='D'))['data'].count()).mean()

print(f"{min_day:,.0f} json file per day")


2,742 tweets per file
1,411 json file per day


In [37]:
donwloaded = df.groupby(pd.Grouper(key='data', freq='M'))['size'].sum()
print(f"Total of days: {len(down_df)} \n")

print(f"Total Downloaded: {humanize(donwloaded.sum())}")
print(f"An average of {humanize(donwloaded.sum()/len(down_df))} per day downloaded \n")

print("Total of Tweets: {:,}".format(df['count_total'].sum()))
print(f"An average of {df['count_total'].sum()/len(down_df):,.0f} tweets downloaded per day \n")

print("Tweets filtered: {:,}".format(df['count_filtered'].sum()))
print(f"An average of {df['count_filtered'].sum()/len(down_df):,.0f} tweets filtered per day \n")


#deleting df
del down_df, donwloaded, df

Total of days: 365 

Total Downloaded: 917.96 GiB
An average of 2.51 GiB per day downloaded 

Total of Tweets: 1,412,214,396.0
An average of 3,869,081 tweets downloaded per day 

Tweets filtered: 763,267.0
An average of 2,091 tweets filtered per day 



## Checking Duplicates

In [38]:
names_control = read_db(f"SELECT name FROM control")

In [39]:
duplicated = names_control.duplicated(['name'])
df_duplicated = names_control.loc[duplicated, :]
if len(df_duplicated)> 0 :
    print(f'Find {len(df_duplicated)} duplicated')
    print(df_duplicados)
else:
    print("Don't find duplicated") 
    
    
#deleting df
del duplicated, df_duplicated, names_control

Don't find duplicated


# Importing to Hadoop 

In [40]:
def mysql_to_hdfs(path, mysql_table, hdfs_folder = '/CA4' ,split_by_month=False, date_col='created_at'):

    path = check_folder(path)

    #Loading data from MySQL database
    df = read_db(f"SELECT * FROM {mysql_table}")
    
    if split_by_month == True:
        
        #Adding month
        df[date_col] = pd.to_datetime(df[date_col]).dt.tz_localize('UTC')
        df['month'] = df[date_col].dt.month

        # Splitting DataFrame by month and saving in parquet files 
        for month in range(1, 13):
            month_df = df[df['month'] == month]
            month_df.to_parquet(f"{path}/{mysql_table}_{month}.parquet")
            print(f"{mysql_table}_{month}.parquet >>> SAVED AT >>> {path}")
            
    else:
        df.to_parquet(f"{path}/{mysql_table}.parquet")
    
    # Create a client
    client = InsecureClient('http://localhost:9870', user='hduser')

    # Copy files to hdfs
    client.upload(hdfs_folder, path)
    print(f"{path} >>> UPLOADED AT >>> {hdfs_folder}")
    

## Control Table

## Tweets Table