# Twitter Data Processing Pipeline with Dask and Incremental Writes


## Introduction

This notebook provides a complete pipeline for processing tweet data, optimized for large datasets using Dask. 
It also includes incremental writes to a CSV file to save progress.


In [1]:
import warnings
import pandas as pd
import json
import csv
import requests
from typing import Dict, List, Any
import logging
import numpy as np
import concurrent.futures
import os

In [3]:
warnings.filterwarnings(action='ignore', category=FutureWarning)
# warnings.filterwarnings(action='ignore', category=FutureWarning, module='pyspark')

In [4]:
# Initialize Spark session
# Configure logging
logging.basicConfig(level=logging.INFO)
logging.basicConfig(filename='processing.log', level=logging.INFO)
# spark = SparkSession.builder \
#     .appName("Twitter Data Processing") \
#     .getOrCreate()


In [5]:
# data_name = 'streamV2_tweetnet_2023-06'

In [6]:
# Read the JSONL file
# df = pd.read_json(f'../data/{data_name}.jsons', lines=True)

In [7]:
# Define the transformation function
def extract_fields(json_obj):
    tweet_id = json_obj.get('tweet_id', '')
    tweet_type = json_obj.get('tweet_type', '')
    hashtags = json_obj.get('hashtags', [])
    mentions = json_obj.get('mentions', [])
    return {
        'tweet_id': tweet_id,
        'tweet_type': tweet_type,
        'hashtags': hashtags,
        'mentions': mentions
    }


In [2]:
# API call
def fetch_additional_info(tweet_id):
    url = "https://cdn.syndication.twimg.com/tweet-result"
    querystring = {"id": tweet_id, "lang": "en", "token": "x"}
    payload = ""
    headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/114.0",
    "Accept": "*/*",
    "Accept-Language": "en-US,en;q=0.5",
    "Accept-Encoding": "gzip, deflate, br",
    "Origin": "https://platform.twitter.com",
    "Connection": "keep-alive",
    "Referer": "https://platform.twitter.com/",
    "Sec-Fetch-Dest": "empty",
    "Sec-Fetch-Mode": "cors",
    "Sec-Fetch-Site": "cross-site",
    "Pragma": "no-cache",
    "Cache-Control": "no-cache",
    "TE": "trailers"
    }
    try:
        response = requests.request("GET", url, data=payload, headers=headers, params=querystring)
        if response.status_code != 200:
            # print(f"Failed to fetch additional info for tweet_id {tweet_id}")
            return None
    except Exception as e:
        logging.error(f'Failed to fetch additional info for tweet_id {tweet_id}')
        return None
    return response.text


In [None]:
fetch_additional_info(1630682530465587204)

'{"__typename":"Tweet","in_reply_to_screen_name":"DrSinanOgan","in_reply_to_status_id_str":"1664012893799448579","in_reply_to_user_id_str":"177884455","lang":"und","favorite_count":0,"created_at":"2023-05-31T20:59:55.000Z","display_text_range":[13,22],"entities":{"hashtags":[],"urls":[],"user_mentions":[{"id_str":"177884455","indices":[0,12],"name":"Dr. Sinan Oğan","screen_name":"DrSinanOgan"}],"symbols":[]},"id_str":"1664013863526735874","text":"@DrSinanOgan Offf 🤦🤦","user":{"id_str":"1256361951862689797","name":"Ivan Karamazov","profile_image_url_https":"https://pbs.twimg.com/profile_images/1673470909993168899/vHLg8HHU_normal.jpg","screen_name":"fyodorkrmzv","verified":false,"is_blue_verified":false,"profile_image_shape":"Circle"},"edit_control":{"edit_tweet_ids":["1664013863526735874"],"editable_until_msecs":"1685568595000","is_edit_eligible":false,"edits_remaining":"5"},"conversation_count":0,"news_action_type":"conversation","parent":{"lang":"tr","reply_count":14714,"retweet_count

In [10]:

def parse_api_response(api_response):
    if not api_response:
        return {}
    try:
        parsed_data = json.loads(api_response)
    except json.JSONDecodeError:
        logging.error(f'Failed parse_api_response {api_response}')
        return {}
    
    lang = parsed_data.get('lang', '')
    favorite_count = parsed_data.get('favorite_count', 0)
    created_at = parsed_data.get('created_at', '')
    text = parsed_data.get('text', '')
    parent_tweet_id = parsed_data.get('parent', {}).get('id_str', '')
    
    return {
        'lang': lang,
        'favorite_count': favorite_count,
        'created_at': created_at,
        'text': text,
        'parent_tweet_id': parent_tweet_id
    }


In [11]:
# parsed_api_response_test = parse_api_response(api_response_test)

In [12]:
# this cell not used

# # Initialize a CSV writer and write the header
# with open(f'output_{data_name}.csv', 'w', newline='') as f:
#     writer = csv.DictWriter(f, fieldnames=['tweet_id', 'tweet_type', 'hashtags', 'mentions', 'lang', 'favorite_count', 'created_at', 'text', 'parent_tweet_id'])
#     writer.writeheader()

# Function to write a single row to the CSV file
def write_row_to_csv(row):
    try:
        with open('output_{data_name}.csv', 'a', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=['tweet_id', 'tweet_type', 'hashtags', 'mentions', 'lang', 'favorite_count', 'created_at', 'text', 'parent_tweet_id'])
            writer.writerow(row)
    except Exception as e:
        print(f"Failed to write row to CSV: {e}")


# Function to process a single JSON object (this includes the API call)
def process_json_object(json_obj):
    try: 
        # Extract initial fields
        row = extract_fields(json_obj)
        
        # Fetch additional info from API (You'll have to add your API logic)
        api_response = fetch_additional_info(row['tweet_id'])
        
        # Parse the API response
        additional_info = parse_api_response(api_response)
        
        # Merge initial data and additional info
        row.update(additional_info)
        
        # Write the row to CSV
        write_row_to_csv(row)
    except Exception as e:
        print(f"Failed to process JSON object: {e}")



In [13]:
def custom_write_csv(df: pd.DataFrame, output_path: str, data_name: str):
    file_name = os.path.join(output_path, f'output_{data_name}.csv')
    try:
        df.to_csv(file_name, mode='a', index=False, header=False)
    except Exception as e:
        logging.error(f'Failed to write chunk {e}')
        

# Define a function to process a chunk of data
def process_chunk(df_chunk: pd.DataFrame, output_path: str, data_name: str):
    logging.info(f'Processing chunk entered')
    results = []
    for idx, row in df_chunk.iterrows():
        row_dict = row.to_dict()
        logging.info(f'Processing tweet id: {row_dict["tweet_id"]}')
        api_response = fetch_additional_info(row_dict['tweet_id'])
        additional_info = parse_api_response(api_response)
        row_dict.update(additional_info)
        # Convert hashtags and mentions array to a comma-separated string
        row_dict['hashtags'] = ','.join(row_dict['hashtags']) if isinstance(row_dict['hashtags'], (list, tuple)) else ''
        row_dict['mentions'] = ','.join(row_dict['mentions']) if isinstance(row_dict['mentions'], (list, tuple)) else ''
        results.append(row_dict)
    result_df = pd.DataFrame(results)
    # Filter the DataFrame to only include the columns specified in the schema
    result_df = result_df[['tweet_id', 'tweet_type', 'hashtags', 'mentions', 'lang', 'favorite_count', 'created_at', 'text', 'parent_tweet_id']]
    custom_write_csv(result_df, output_path, data_name)  # Pass output_path and data_name to custom_write_csv

def process_data_in_parallel(df, output_path: str, data_name: str):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        chunks = np.array_split(df, 10)
        # Use a lambda function to pass the output_path and data_name arguments to process_chunk
        executor.map(lambda chunk: process_chunk(chunk, output_path, data_name), chunks)


In [14]:
input_folder_path = './test_Folder'
output_folder_path = '../data/output/test_Folder'
os.makedirs(output_folder_path, exist_ok=True)  # Create output folder if it doesn't exist

def process_file(file_path, output_path):
    # Extract data_name from the file path
    data_name = os.path.basename(file_path).replace('.jsons', '')
    df = pd.read_json(file_path, lines=True)
    
    # Create the output directory if it doesn't exist
    os.makedirs(output_path, exist_ok=True)
    
    # Write the header to the output file
    output_file = os.path.join(output_path, f'output_{data_name}.csv')
    header_df = pd.DataFrame(columns=['tweet_id', 'tweet_type', 'hashtags', 'mentions', 'lang', 'favorite_count', 'created_at', 'text', 'parent_tweet_id'])
    header_df.to_csv(output_file, index=False)
    
    # Process each chunk in parallel
    process_data_in_parallel(df, output_path, data_name)

def process_all_files_in_folder(folder_path, output_folder_path):
    for file_name in os.listdir(folder_path):
        if file_name.endswith('.jsons'):
            file_path = os.path.join(folder_path, file_name)
            logging.info(f'Processing file: {file_name}')
            process_file(file_path, output_folder_path)

In [None]:
process_all_files_in_folder(input_folder_path, output_folder_path)

In [None]:
# # Call the function to process all files in the specified folder
# file_path = "./test_Folder/streamV2_tweetnet_2023-06_0.jsons"
# # Extract data_name from the file path
# data_name = os.path.basename(file_path).replace('.jsons', '')
# df = pd.read_json(file_path, lines=True)
# output_file = os.path.join(output_folder_path, f'output_{data_name}.csv')  # Update this line to use output_folder_path

#     # Write the header to the output file
# header_df = pd.DataFrame(columns=['tweet_id', 'tweet_type', 'hashtags', 'mentions', 'lang', 'favorite_count', 'created_at', 'text', 'parent_tweet_id'])
# header_df.to_csv(output_file, index=False)
# process_chunk(df, output_file)

## Test

In [None]:
import warnings
import pandas as pd
import json
import csv
import requests
from typing import Dict, List, Any
import logging
import numpy as np
import concurrent.futures
import os
import re

# Define the transformation function
def extract_fields(json_obj):
    tweet_id = json_obj.get('tweet_id', '')
    tweet_type = json_obj.get('tweet_type', '')
    hashtags = json_obj.get('hashtags', [])
    mentions = json_obj.get('mentions', [])
    return {
        'tweet_id': tweet_id,
        'tweet_type': tweet_type,
        'hashtags': hashtags,
        'mentions': mentions
    }

# API call
def fetch_additional_info(tweet_id):
    url = "https://cdn.syndication.twimg.com/tweet-result"
    querystring = {"id": tweet_id, "lang": "en", "token": "x"}
    payload = ""
    headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/114.0",
    "Accept": "*/*",
    "Accept-Language": "en-US,en;q=0.5",
    "Accept-Encoding": "gzip, deflate, br",
    "Origin": "https://platform.twitter.com",
    "Connection": "keep-alive",
    "Referer": "https://platform.twitter.com/",
    "Sec-Fetch-Dest": "empty",
    "Sec-Fetch-Mode": "cors",
    "Sec-Fetch-Site": "cross-site",
    "Pragma": "no-cache",
    "Cache-Control": "no-cache",
    "TE": "trailers"
    }
    try:
        response = requests.request("GET", url, data=payload, headers=headers, params=querystring)
        if response.status_code != 200:
            # print(f"Failed to fetch additional info for tweet_id {tweet_id}")
            return None
    except Exception as e:
        logging.error(f'Failed to fetch additional info for tweet_id {tweet_id}')
        return None
    return response.text

# Extract the additional info from API response
def parse_api_response(api_response):
    if not api_response:
        return {}
    try:
        parsed_data = json.loads(api_response)
    except json.JSONDecodeError:
        logging.error(f'Failed: parse_api_response. Error.')
        return {}
    
    lang = parsed_data.get('lang', '')
    favorite_count = parsed_data.get('favorite_count', 0)
    created_at = parsed_data.get('created_at', '')
    text = parsed_data.get('text', '')
    parent_tweet_id = parsed_data.get('parent', {}).get('id_str', '')

    # New:
    hashtags = parsed_data.get('entities', {}).get('hashtags', '')
    mentions = parsed_data.get('user_mentions', {}).get('user_mentions', '')
    tweet_type = parsed_data.get('__typename', '')
    tweet_id = parsed_data.get('id_str', '')

    return {
        'tweet_id': tweet_id,
        'tweet_type': tweet_type,
        'hashtags': hashtags,
        'mentions': mentions,
        'lang': lang,
        'favorite_count': favorite_count,
        'created_at': created_at,
        'text': text,
        'parent_tweet_id': parent_tweet_id,         
    }

# Last function in the process, which converts dataframe to csv file
def custom_write_csv(df: pd.DataFrame, output_path: str, data_name: str):
    file_name = os.path.join(output_path, f'output_{data_name}.csv')
    try:
        df.to_csv(file_name, mode='a', index=False, header=False)
    except Exception as e:
        logging.error(f'Failed to write chunk {e}')
        

# Processes a chunk of data. The function is used in process_data_in_parallel, which chunks the given df into 10 chunks
def process_chunk(df_chunk: pd.DataFrame, output_path: str, data_name: str):
    results = []

    for idx, row in df_chunk.iterrows():
        row_dict = row.to_dict()
        try:
            api_response = fetch_additional_info(row_dict['tweet_id'])
            additional_info = parse_api_response(api_response)
            row_dict.update(additional_info)
        except Exception as e: 
            logging.error(f'Failed to process chunk {e}')

        # Convert hashtags and mentions array to a comma-separated string
        #row_dict['hashtags'] = ','.join(row_dict['hashtags']) if isinstance(row_dict['hashtags'], (list, tuple)) else ''
        #row_dict['mentions'] = ','.join(row_dict['mentions']) if isinstance(row_dict['mentions'], (list, tuple)) else ''
        results.append(row_dict)
        #print(f"results inside: {results}")
    #print(f"Results after everything: {results}")
    result_df = pd.DataFrame(results)
    #print(f"Result_df: {result_df}")
    
    # Filter the DataFrame to only include the columns specified in the schema
    #result_df = result_df[['tweet_id', 'tweet_type', 'hashtags', 'mentions', 'lang', 'favorite_count', 'created_at', 'text', 'parent_tweet_id', 'entities']]
    
    custom_write_csv(result_df, output_path, data_name)  # Pass output_path and data_name to custom_write_csv

# Used for parallel processing, main function here is process_chunk
def process_data_in_parallel(df, output_path: str, data_name: str):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        chunks = np.array_split(df, 10)
        # Use a lambda function to pass the output_path and data_name arguments to process_chunk
        executor.map(lambda chunk: process_chunk(chunk, output_path, data_name), chunks)

# processes a file, which is written for .txt, calls process_data_in_parallel
def process_file(file_path, output_path):
    logging.info(f'Processing file path: {file_path}')
    # Extract data_name from the file path
    data_name = os.path.basename(file_path).replace('.txt', '')
    
    # Read txt file and convert it to DataFrame
    with open(file_path, 'r') as file:
        tweet_ids = [int(line.strip()) for line in file]
    df = pd.DataFrame(tweet_ids, columns=['tweet_id'])
    # Create the output directory if it doesn't exist
    os.makedirs(output_path, exist_ok=True)
    
    # Write the header to the output file
    output_file = os.path.join(output_path, f'output_{data_name}.csv')
    header_df = pd.DataFrame(columns=['tweet_id', 'tweet_type', 'hashtags', 'mentions', 'lang', 'favorite_count', 'created_at', 'text', 'parent_tweet_id'])
    header_df.to_csv(output_file, index=False)
    
    # Process each chunk in parallel
    process_data_in_parallel(df, output_path, data_name)

# Processes all the files in the folder, calls process_file
# def process_all_files_in_folder(folder_path, output_folder_path):
#     for file_name in os.listdir(folder_path):
#         if file_name.endswith('.txt'):
#             file_path = os.path.join(folder_path, file_name)
#             logging.info(f'Processing file: {file_name}')
#             process_file(file_path, output_folder_path)

def extract_number(filename):
    # Regular expression to match a sequence of digits
    match = re.search(r'_(\d+)\.txt$', filename)
    return int(match.group(1)) if match else 0

def process_all_files_in_folder(folder_path, output_folder_path):
    # Get all files in folder_path that end with .txt
    files = [f for f in os.listdir(folder_path) if f.endswith('.txt')]
    # Sort files based on the numeric part of the filename
    sorted_files = sorted(files, key=extract_number)
    
    for file_name in sorted_files:
        file_path = os.path.join(folder_path, file_name)
        logging.info(f'Processing file: {file_name}')
        #process_file(file_path, output_folder_path)

In [2]:
warnings.filterwarnings(action='ignore', category=FutureWarning)

# Configure logging to write to a file and the console
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(filename='processing.log'),
        logging.StreamHandler()
    ]
)

input_folder_path = '../data/tracking_tweetids_2023-03'
output_folder_path = '../data/output/tracking_tweetids_2023-03_splitted'

# Create output folder if it doesn't exist
os.makedirs(output_folder_path, exist_ok=True)  
process_all_files_in_folder(input_folder_path, output_folder_path)
# process_file(input_file, output_folder_path)
logging.info('Done processing all files')

2023-10-29 18:19:48,710 - root - INFO - Processing file: tracking_tweetids_2023-03_633.txt
2023-10-29 18:19:48,710 - root - INFO - Processing file: tracking_tweetids_2023-03_587.txt
2023-10-29 18:19:48,711 - root - INFO - Processing file: tracking_tweetids_2023-03_487.txt
2023-10-29 18:19:48,711 - root - INFO - Processing file: tracking_tweetids_2023-03_647.txt
2023-10-29 18:19:48,712 - root - INFO - Processing file: tracking_tweetids_2023-03_798.txt
2023-10-29 18:19:48,712 - root - INFO - Processing file: tracking_tweetids_2023-03_1026.txt
2023-10-29 18:19:48,713 - root - INFO - Processing file: tracking_tweetids_2023-03_881.txt
2023-10-29 18:19:48,714 - root - INFO - Processing file: tracking_tweetids_2023-03_571.txt
2023-10-29 18:19:48,714 - root - INFO - Processing file: tracking_tweetids_2023-03_1053.txt
2023-10-29 18:19:48,714 - root - INFO - Processing file: tracking_tweetids_2023-03_212.txt
2023-10-29 18:19:48,715 - root - INFO - Processing file: tracking_tweetids_2023-03_550.t

In [3]:
info = fetch_additional_info(1688217309112901632)
parse_api_response(info)

{'lang': 'tr',
 'favorite_count': 4842,
 'created_at': '2023-08-06T15:55:46.000Z',
 'text': 'İstanbul Hukuk 2023 mezuniyet töreninde konuşma yapması gereken ancak yaptırılmayan okul birincilerden biri olarak olayların çok farklı yerlere çekilmesinden, sanki tek bir kişiye, konuşmasının içeriğinden dolayı konuşma yaptırılmamış algısının yaratılmasından çok rahatsızım. https://t.co/FgEhtxJqFj',
 'parent_tweet_id': ''}

In [5]:
info_str = json.loads(info)
info_str.get('entities', {})

{'hashtags': [],
 'urls': [],
 'user_mentions': [],
 'symbols': [],
 'media': [{'display_url': 'pic.twitter.com/FgEhtxJqFj',
   'expanded_url': 'https://twitter.com/Berkaydyilmaz/status/1688217309112901632/photo/1',
   'indices': [277, 300],
   'url': 'https://t.co/FgEhtxJqFj'}]}

In [None]:
# TODO: look if with the new information we can extract everything we need from the API call and parse it into csv
# TODO: we need it to work for just one txt file. How is the parallelization works there?

In [None]:
import os

def split_file(file_path, lines_per_file, output_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    # Get the base name of the input file (without extension)
    base_name = os.path.basename(file_path).rsplit('.', 1)[0]

    for i in range(0, len(lines), lines_per_file):
        # Construct the path for the smaller files
        small_file_name = f"{base_name}_{i // lines_per_file}.txt"
        small_file_path = os.path.join(output_path, small_file_name)
        with open(small_file_path, 'w') as small_file:
            small_file.writelines(lines[i:i + lines_per_file])

# Ensure the output directory exists
os.makedirs('output', exist_ok=True)

# Call the function
split_file('../data/streamV2_tweetids_2023-06.txt', 100000, '../data/streamV2_tweetids_2023-06')
