In [1]:
from currency_symbols import CurrencySymbols
import string
import re
import os
from tqdm import tqdm
import pandas as pd
from google.cloud import storage
from google.cloud import bigquery
from google.oauth2 import service_account

# Define the regex patterns
emoji_pattern = (
    u"\U0001F600-\U0001F64F"  # emoticons
    u"\U0001F300-\U0001F5FF"  # symbols & pictographs
    u"\U0001F680-\U0001F6FF"  # transport & map symbols
    u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
    u"\U00002702-\U000027B0"
    u"\U000024C2-\U0001F251"
)
accented_characters = u"\u00C0-\u00FF"  # Latin-1 Supplement (accented characters)
special_symbols = '©®™±≠≤≥∞π∑§¶†•′″‰′′←→↑↓↔↕'

# ISO codes for all currencies 
currencies = [
    'AFN', 'EUR', 'ALL', 'DZD', 'USD', 'EUR', 'AOA', 'XCD', 'XCD', 'ARS', 'AMD', 'AWG', 'AUD', 'EUR', 'AZN',
    'BSD', 'BHD', 'BDT', 'BBD', 'BYN', 'EUR', 'BZD', 'XOF', 'BMD', 'INR', 'BTN', 'BOB', 'BOV', 'USD', 'BAM',
    'BWP', 'NOK', 'BRL', 'USD', 'BND', 'BGN', 'XOF', 'BIF', 'CVE', 'KHR', 'XAF', 'CAD', 'KYD', 'XAF', 'XAF',
    'CLP', 'CLF', 'CNY', 'AUD', 'AUD', 'COP', 'COU', 'KMF', 'CDF', 'XAF', 'NZD', 'CRC', 'XOF', 'EUR', 'CUP',
    'CUC', 'ANG', 'EUR', 'CZK', 'DKK', 'DJF', 'XCD', 'DOP', 'USD', 'EGP', 'SVC', 'USD', 'XAF', 'ERN', 'EUR',
    'SZL', 'ETB', 'EUR', 'FKP', 'DKK', 'FJD', 'EUR', 'EUR', 'EUR', 'XPF', 'EUR', 'XAF', 'GMD', 'GEL', 'EUR',
    'GHS', 'GIP', 'EUR', 'DKK', 'XCD', 'EUR', 'USD', 'GTQ', 'GBP', 'GNF', 'XOF', 'GYD', 'HTG', 'USD', 'AUD',
    'EUR', 'HNL', 'HKD', 'HUF', 'ISK', 'INR', 'IDR', 'XDR', 'IRR', 'IQD', 'EUR', 'GBP', 'ILS', 'EUR', 'JMD',
    'JPY', 'GBP', 'JOD', 'KZT', 'KES', 'AUD', 'KPW', 'KRW', 'KWD', 'KGS', 'LAK', 'EUR', 'LBP', 'LSL', 'ZAR',
    'LRD', 'LYD', 'CHF', 'EUR', 'EUR', 'MOP', 'MKD', 'MGA', 'MWK', 'MYR', 'MVR', 'XOF', 'EUR', 'USD', 'EUR',
    'MRU', 'MUR', 'EUR', 'XUA', 'MXN', 'MXV', 'USD', 'MDL', 'EUR', 'MNT', 'EUR', 'XCD', 'MAD', 'MZN', 'MMK',
    'NAD', 'ZAR', 'AUD', 'NPR', 'EUR', 'XPF', 'NZD', 'NIO', 'XOF', 'NGN', 'NZD', 'AUD', 'USD', 'NOK', 'OMR',
    'PKR', 'USD', 'PAB', 'USD', 'PGK', 'PYG', 'PEN', 'PHP', 'NZD', 'PLN', 'EUR', 'USD', 'QAR', 'EUR', 'RON',
    'RUB', 'RWF', 'EUR', 'SHP', 'XCD', 'XCD', 'EUR', 'EUR', 'XCD', 'WST', 'EUR', 'STN', 'SAR', 'XOF', 'RSD',
    'SCR', 'SLE', 'SGD', 'ANG', 'XSU', 'EUR', 'EUR', 'SBD', 'SOS', 'ZAR', 'SSP', 'EUR', 'LKR', 'SDG', 'SRD',
    'NOK', 'SEK', 'CHF', 'CHE', 'CHW', 'SYP', 'TWD', 'TJS', 'TZS', 'THB', 'USD', 'XOF', 'NZD', 'TOP', 'TTD',
    'TND', 'TRY', 'TMT', 'USD', 'AUD', 'UGX', 'UAH', 'AED', 'GBP', 'USD', 'USD', 'USN', 'UYU', 'UYI', 'UYW',
    'UZS', 'VUV', 'VES', 'VED', 'VND', 'USD', 'USD', 'XPF', 'MAD', 'YER', 'ZMW', 'ZWL', 'ZWG', 'XBA', 'XBB',
    'XBC', 'XBD', 'XTS', 'XXX', 'XAU', 'XPD', 'XPT', 'XAG'
]

currency_symbols = [CurrencySymbols.get_symbol(x) for x in currencies]
currency_symbols = [x for x in currency_symbols if x and not x.isalpha()]  # Filter out None values and alphabetic characters

pattern_string = (
    r'[^a-zA-Z0-9\s' +
    ''.join([re.escape(x) for x in string.punctuation]) +
    ''.join([re.escape(x) for x in currency_symbols]) +
    special_symbols +
    emoji_pattern +
    accented_characters +
    ']'
)

pattern = re.compile(pattern_string, re.UNICODE)

def clean_dataframe(df):
    df_cleaned = df[~df.apply(lambda row: row.astype(str).str.contains(pattern).any(), axis=1)]
    df_cleaned['text'] = df_cleaned['text'].str.replace('"', ' ')
    df_cleaned['text'] = df_cleaned['text'].str.strip()
    df_cleaned['text'] = df_cleaned['text'].str.rstrip()
    escaped_chars_pattern = re.compile(r'[' + ''.join([re.escape(c) for c in ['\\', '\t', '\n']]) + ']')
    df_cleaned['text'] = df_cleaned['text'].str.replace(escaped_chars_pattern, ' ', regex=True)
    excessive_whitespace_pattern = re.compile(r'\s{2,}')
    df_cleaned['text'] = df_cleaned['text'].str.replace(excessive_whitespace_pattern, ' ', regex=True)
    df_cleaned.drop(['date', 'hard_cleaned_text', 'soft_cleaned_text'], axis=1, inplace=True)

    return df_cleaned


def process_and_clean_csv(input_path):
    df = pd.read_csv(input_path, header=0, index_col=False, encoding='utf-8', on_bad_lines='skip')
    df_cleaned = clean_dataframe(df)
    return df_cleaned

def upload_file(bucket_name, source_file_path, destination_blob_name):
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    storage.blob._DEFAULT_CHUNKSIZE = 100 * 1024 * 1024  # 100 MB chunk size
    storage.blob._MAX_MULTIPART_SIZE = 100 * 1024 * 1024  # 100 MB max multipart size
    blob.upload_from_filename(source_file_path, timeout=600)
    print(f'{source_file_path} uploaded to {bucket_name} as {destination_blob_name}.')

def load_to_bigquery(df, table_ref):
    job = bigquery_client.load_table_from_dataframe(df, table_ref)
    job.result()
    print(f'Loaded DataFrame into {table_ref.table_id}.')

# Service account key
key_path = '/Users/chkapsalis/Downloads/nlp-project-427710-3e1a48df3dba.json'
credentials = service_account.Credentials.from_service_account_file(key_path)

# Google Cloud project id and dataset information
project_id = 'nlp-project-427710'
dataset_id = 'crypto'  # Replace with your dataset ID
table_id = 'btc1'  # Replace with your table ID

# Initialization of the BigQuery client
bigquery_client = bigquery.Client(project=project_id, credentials=credentials)
storage_client = storage.Client(project=project_id, credentials=credentials)

# Create the dataset if it does not exist
dataset_ref = bigquery_client.dataset(dataset_id)
dataset = bigquery.Dataset(dataset_ref)

try:
    bigquery_client.get_dataset(dataset_ref)  # Make an API request.
    print(f"Dataset {dataset_id} already exists.")
except:
    dataset = bigquery_client.create_dataset(dataset)  # Make an API request.
    print(f"Dataset {dataset_id} created.")

# Define the job configuration for loading data
job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("user_followers", "DECIMAL"),
        bigquery.SchemaField("user_verified", "INTEGER"),
        bigquery.SchemaField("text", "STRING"),
        bigquery.SchemaField("vader_sentiment", "DECIMAL"),
        bigquery.SchemaField("afinn_sentiment", "DECIMAL"),
        bigquery.SchemaField("sentiment", "DECIMAL")
    ],
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
    field_delimiter='|',  # used this custom delimiter to help make sense out of the data
    autodetect=False,  # Automatically detect the schema
    max_bad_records=2000,  # Allow up to 2000 bad records
    ignore_unknown_values=True  # Ignore unknown values
)

source_folder = '/Users/chkapsalis/Downloads/btc1'
cleaned_folder = '/Users/chkapsalis/Downloads/cleaned_btc1'

os.makedirs(cleaned_folder, exist_ok=True)

for filename in tqdm(os.listdir(source_folder)):
    if filename.endswith('.csv'):
        source_file = os.path.join(source_folder, filename)
        df_cleaned = process_and_clean_csv(source_file)

        # making sure that the dtypes of columns match what asserted in the schema
        df_cleaned['user_followers'] = pd.to_numeric(df_cleaned['user_followers'], errors='coerce')

        
        # Upload the cleaned DataFrame to BigQuery
        #load_to_bigquery(df_cleaned, dataset_ref.table(table_id))
        load_to_bigquery(df_cleaned.iloc[:1000000, ], dataset_ref.table(table_id))
        load_to_bigquery(df_cleaned.iloc[1000000:3000000, ], dataset_ref.table(table_id))

  from pandas.core import (


Dataset crypto already exists.


  df = pd.read_csv(input_path, header=0, index_col=False, encoding='utf-8', on_bad_lines='skip')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_cleaned['text'] = df_cleaned['text'].str.replace('"', ' ')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_cleaned['text'] = df_cleaned['text'].str.strip()
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-c

ArrowInvalid: Could not convert '304.0' with type str: tried to convert to double

In [2]:
df_cleaned

Unnamed: 0,user_followers,user_verified,text,vader_sentiment,afinn_sentiment,sentiment
3,625.0,0,$BTC A big chance in a billion! Price: 4872644...,0.2500,0.299145,0.279487
6,131.0,0,&lt;'fire' &amp; 'man'&gt; #Bitcoin #Crypto #B...,-0.0772,0.213675,0.097325
10,1159.0,0,Annnd #btc #Bitcoin is headed even higher now....,0.0000,0.247863,0.148718
13,668.0,0,#Bitcoin #BTC $BTC $GBTC $RIOT $MARA $ETH $ETH...,-0.5574,0.213675,-0.094755
14,1281.0,0,⬆️⬆️ $BTC BUYING PRESSURE ALERT 📈 Price tradin...,0.0000,0.213675,0.128205
...,...,...,...,...,...,...
94843179,66.0,0,#DogelonMars is the future. #TSUKA is the next...,0.3182,0.299145,0.306767
94843180,674.0,0,"Bitcoin squeeze is SUPER TIGHT, which way will...",0.5994,0.299145,0.419247
94843181,79.0,0,Closed #BTC short at 16725. Missed my long pla...,-0.6124,0.213675,-0.116755
94843182,532.0,0,#Ethereum price update: #ETH $1263.59 USD #Bit...,0.0000,0.247863,0.148718


In [3]:
df_cleaned.info()

<class 'pandas.core.frame.DataFrame'>
Index: 94009455 entries, 3 to 94843184
Data columns (total 6 columns):
 #   Column           Dtype  
---  ------           -----  
 0   user_followers   object 
 1   user_verified    object 
 2   text             object 
 3   vader_sentiment  float64
 4   afinn_sentiment  float64
 5   sentiment        float64
dtypes: float64(3), object(3)
memory usage: 4.9+ GB


In [4]:
df_cleaned['user_followers'] = pd.to_numeric(df_cleaned['user_followers'], errors='coerce')


In [5]:
df_cleaned.info()

<class 'pandas.core.frame.DataFrame'>
Index: 94009455 entries, 3 to 94843184
Data columns (total 6 columns):
 #   Column           Dtype  
---  ------           -----  
 0   user_followers   float64
 1   user_verified    object 
 2   text             object 
 3   vader_sentiment  float64
 4   afinn_sentiment  float64
 5   sentiment        float64
dtypes: float64(4), object(2)
memory usage: 4.9+ GB


In [7]:
load_to_bigquery(df_cleaned.iloc[:1000000, ], dataset_ref.table(table_id))

Loaded DataFrame into btc1.


In [8]:
load_to_bigquery(df_cleaned.iloc[1000000:3000000, ], dataset_ref.table(table_id))

Loaded DataFrame into btc1.
