In [None]:
# Import necessary libraries for data processing, database connection, Telegram API interaction, and logging

import pandas as pd  # Data manipulation and analysis
import psycopg2  # PostgreSQL database connection
from telethon import TelegramClient  # Telegram API client for data extraction
from telethon.sessions import StringSession  # Session management for Telegram client
import asyncio  # Asynchronous programming support
import nest_asyncio  # Enables nested event loops in Jupyter and other environments
import logging  # Logging for debugging and monitoring
import re  # Regular expressions for text processing
import emoji  # Emoji handling and removal from text

In [2]:
# Telegram API Credentials (Replace with your values)
api_id = "25516266"
api_hash = "fa60997cc6b272938bb894a93df300e5"

# PostgreSQL Credentials
DB_NAME = "E2E-DP"
DB_USER = "postgres"
DB_PASSWORD = "postgres"
DB_HOST = "localhost"
DB_PORT = "5432"

try:
    # Connect to PostgreSQL Database
    conn = psycopg2.connect(
        dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT
    )
    cursor = conn.cursor()
    print("Connected to PostgreSQL")
except Exception as e:
    print("Error connecting to PostgreSQL:", e)


Connected to PostgreSQL


In [3]:
# Load CSV file
df_ecom = pd.read_csv("Online_Retail.csv")  
print(df_ecom.shape)

(1001200, 8)


In [4]:
#  Display first 3 datas
print(df_ecom.head(3)) 

  InvoiceNo StockCode                         Description  Quantity  \
0    536365    85123A  WHITE HANGING HEART T-LIGHT HOLDER         6   
1    536365     71053                 WHITE METAL LANTERN         6   
2    536365    84406B      CREAM CUPID HEARTS COAT HANGER         8   

      InvoiceDate  UnitPrice  CustomerID         Country  
0  12/1/2010 8:26       2.55     17850.0  United Kingdom  
1  12/1/2010 8:26       3.39     17850.0  United Kingdom  
2  12/1/2010 8:26       2.75     17850.0  United Kingdom  


In [5]:
# List of columns to clean
columns_to_clean = ["InvoiceNo", "StockCode", "Description", "Quantity", "UnitPrice", "CustomerID",	"Country"] 

# Ensure all columns in the list exist before cleaning
missing_columns = [col for col in columns_to_clean if col not in df_ecom.columns]
if missing_columns:
    print(f"Error: The following columns are missing: {', '.join(missing_columns)}")
else:
    # Drop rows with missing values in the selected columns
    df_ecom.dropna(subset=columns_to_clean, inplace=True)

    # Remove duplicate rows
    df_ecom.drop_duplicates(inplace=True)

    # Handle negative values by setting them to 0 (or you can drop rows with negative values)
    for col in columns_to_clean:
        if col in df_ecom.columns:
            df_ecom[col] = df_ecom[col].apply(lambda x: max(0, x) if isinstance(x, (int, float)) else x)

    print("Cleaning completed successfully!")

     # force convert to int
    df_ecom["CustomerID"] = pd.to_numeric(df_ecom["CustomerID"], errors="coerce").fillna(0).astype(int)

# Save the cleaned file
df_ecom.to_csv("cleaned_Online_Rental_data.csv", index=False)
print("Cleaned file saved as cleaned_Online_Rental_data.csv")


Cleaning completed successfully!
Cleaned file saved as cleaned_Online_Rental_data.csv


In [6]:
# Load the cleaned CSV file
df_ecom = pd.read_csv("cleaned_Online_Rental_data.csv")  
print(df_ecom.shape)  # Display the size of the cleaned dataset

(738757, 8)


In [7]:
 # Display first 3 datas form the cleared dataset
print(df_ecom.head(3))  

  InvoiceNo StockCode                         Description  Quantity  \
0    536365    85123A  WHITE HANGING HEART T-LIGHT HOLDER         6   
1    536365     71053                 WHITE METAL LANTERN         6   
2    536365    84406B      CREAM CUPID HEARTS COAT HANGER         8   

      InvoiceDate  UnitPrice  CustomerID         Country  
0  12/1/2010 8:26       2.55       17850  United Kingdom  
1  12/1/2010 8:26       3.39       17850  United Kingdom  
2  12/1/2010 8:26       2.75       17850  United Kingdom  


In [8]:
nest_asyncio.apply()  # Allows running asyncio inside Jupyter

# List of Telegram channels to scrape
channels = [
    "easybuyethiopia",
    "Ecommerceaddis",
    "jiji_shop_ethiopia"
]

# Create a single Telegram client session
client = TelegramClient(StringSession(), api_id, api_hash)

async def scrape_telegram_data(channel_username, limit=5000):
    """Scrapes messages from Telegram and returns a DataFrame with only the required columns."""
    try:
        messages = []
        async for message in client.iter_messages(channel_username, limit=limit):
            messages.append({
                "date": message.date,
                "message": message.text,
                "user_id": message.sender_id  # Keeps negative IDs if from groups/channels
            })

        df = pd.DataFrame(messages, columns=["date", "message", "user_id"])  # Only these 3 columns
        print(f"Extracted {len(df)} messages from {channel_username}")
        return df

    except Exception as e:
        print(f"Error extracting data from {channel_username}: {e}")
        return pd.DataFrame(columns=["date", "message", "user_id"])  # Ensure correct structure

async def main():
    """Runs the scraper for multiple channels and saves each to a separate CSV."""
    await client.start()
    print("Connected to Telegram successfully!")

    for channel in channels:
        df_telegram = await scrape_telegram_data(channel)
        if not df_telegram.empty:
            filename = f"{channel}_messages.csv"
            df_telegram.to_csv(filename, index=False)  # Save with only 3 columns
            print(f"Data saved to {filename}")
        else:
            print(f"No data extracted from {channel}")

    await client.disconnect()

# Run the scraper
asyncio.run(main())


Signed in successfully as ✨; remember to not break the ToS or you will risk an account ban!
Connected to Telegram successfully!
Extracted 5000 messages from easybuyethiopia
Data saved to easybuyethiopia_messages.csv
Extracted 5000 messages from Ecommerceaddis
Data saved to Ecommerceaddis_messages.csv
Extracted 2288 messages from jiji_shop_ethiopia
Data saved to jiji_shop_ethiopia_messages.csv


In [9]:
# Load telegram_message.CSV file
df_telegram_1 = pd.read_csv("easybuyethiopia_messages.csv")  
print(df_telegram_1.shape,"\n")

# Load telegram_message.CSV file
df_telegram_2 = pd.read_csv("Ecommerceaddis_messages.csv")  
print(df_telegram_2.shape,"\n")

# Load telegram_message.CSV file
df_telegram_3 = pd.read_csv("jiji_shop_ethiopia_messages.csv")  
print(df_telegram_3.shape)

(5000, 3) 

(5000, 3) 

(2288, 3)


In [10]:
print("Max user_id:", df_telegram_1["user_id"].max(), "Min user_id:", df_telegram_1["user_id"].min())
print("Max user_id:", df_telegram_2["user_id"].max(), "Min user_id:", df_telegram_2["user_id"].min())
print("Max user_id:", df_telegram_3["user_id"].max(), "Min user_id:", df_telegram_3["user_id"].min())
print("Max CustomerID:", df_ecom["CustomerID"].max(), "Min CustomerID:", df_ecom["CustomerID"].min())

Max user_id: -1001226361445 Min user_id: -1001226361445
Max user_id: -1001337500007 Min user_id: -1001337500007
Max user_id: -1001400160497 Min user_id: -1001400160497
Max CustomerID: 18287 Min CustomerID: 12346


In [11]:
#  Display first 3 datas
print(df_telegram_1.head(3)) 
print(df_telegram_2.head(3)) 
print(df_telegram_3.head(3)) 

                        date  \
0  2024-11-17 10:06:26+00:00   
1  2024-11-17 10:06:26+00:00   
2  2024-11-17 10:06:26+00:00   

                                             message        user_id  
0  *The Most Sold G-Shock Model Is Now Available*... -1001226361445  
1                                                NaN -1001226361445  
2                                                NaN -1001226361445  
                        date  \
0  2025-02-07 15:51:30+00:00   
1  2025-02-07 09:10:48+00:00   
2  2025-02-07 05:18:01+00:00   

                                             message        user_id  
0  💥Mi 360° rotation security camera 2K\n\n👉ስራ ቦታ... -1001337500007  
1  📣 Multifunctional Shoe and Hat Rack\n\n✔️ ባለ ዘ... -1001337500007  
2  ✔️ ዉድ ደንበኞቻችን ከታች ያስቀመጥነውን ሊንክ ተጫነው የልጆች ዕቃ የሚ... -1001337500007  
                        date  \
0  2025-02-07 07:13:22+00:00   
1  2025-02-07 07:13:21+00:00   
2  2025-02-07 07:13:21+00:00   

                                             message 

In [13]:
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# List of Telegram CSV files
csv_files = [
    "easybuyethiopia_messages.csv",
    "Ecommerceaddis_messages.csv",
    "jiji_shop_ethiopia_messages.csv"
]

# List of columns to clean
columns_to_clean = ["user_id"]
text_columns = ["message"]
date_column = "date"

def remove_emojis(text):
    """Remove emojis from a given text."""
    return emoji.replace_emoji(text, replace="") if isinstance(text, str) else text

# Loop through each file, clean it, and save
for file in csv_files:
    try:
        # Load the CSV file efficiently
        df_telegram = pd.read_csv(file, encoding="utf-8")
        logging.info(f"Loaded {file}: Shape before cleaning -> {df_telegram.shape}")

        # Check if required columns exist
        missing_columns = [col for col in columns_to_clean + text_columns + [date_column] if col not in df_telegram.columns]
        if missing_columns:
            logging.error(f"⚠ The following columns are missing in {file}: {', '.join(missing_columns)}")
            continue  # Skip to next file

        # Remove duplicate rows
        initial_rows = df_telegram.shape[0]
        df_telegram.drop_duplicates(inplace=True)
        logging.info(f"🗑 Removed {initial_rows - df_telegram.shape[0]} duplicate rows.")

        # Ensure correct data type for user_id
        df_telegram["user_id"] = pd.to_numeric(df_telegram["user_id"], errors="coerce").fillna(0).astype("Int64")

        # Ensure no user_id values exceed PostgreSQL BIGINT range
        df_telegram = df_telegram[
            (df_telegram["user_id"] >= -9223372036854775808) & 
            (df_telegram["user_id"] <= 9223372036854775807)
        ]

        # Convert "date" column to proper datetime format
        df_telegram[date_column] = pd.to_datetime(df_telegram[date_column], errors="coerce")

        # Remove rows with invalid dates
        df_telegram.dropna(subset=[date_column], inplace=True)

        # Fill missing text values with "unknown"
        for col in text_columns:
            df_telegram[col] = df_telegram[col].fillna("unknown")

        # Remove emojis from text columns
        for col in text_columns:
            df_telegram[col] = df_telegram[col].apply(remove_emojis)

        # Save cleaned data
        cleaned_filename = f"cleaned_{file}"
        df_telegram.to_csv(cleaned_filename, index=False, encoding="utf-8")
        logging.info(f"Cleaned data saved to {cleaned_filename}: Final shape -> {df_telegram.shape}\n")

    except FileNotFoundError:
        logging.error(f"{file} not found. Ensure the scraper ran successfully.\n")
    except Exception as e:
        logging.error(f"An error occurred while processing {file}: {e}\n")


2025-02-07 18:56:30,306 - INFO - Loaded easybuyethiopia_messages.csv: Shape before cleaning -> (5000, 3)
2025-02-07 18:56:30,317 - INFO - 🗑 Removed 2286 duplicate rows.
2025-02-07 18:56:31,013 - INFO - Cleaned data saved to cleaned_easybuyethiopia_messages.csv: Final shape -> (2714, 3)

2025-02-07 18:56:31,198 - INFO - Loaded Ecommerceaddis_messages.csv: Shape before cleaning -> (5000, 3)
2025-02-07 18:56:31,219 - INFO - 🗑 Removed 837 duplicate rows.
2025-02-07 18:56:33,793 - INFO - Cleaned data saved to cleaned_Ecommerceaddis_messages.csv: Final shape -> (4163, 3)

2025-02-07 18:56:33,793 - INFO - Loaded jiji_shop_ethiopia_messages.csv: Shape before cleaning -> (2288, 3)
2025-02-07 18:56:33,793 - INFO - 🗑 Removed 1056 duplicate rows.
2025-02-07 18:56:33,945 - INFO - Cleaned data saved to cleaned_jiji_shop_ethiopia_messages.csv: Final shape -> (1232, 3)



In [14]:
# Load the cleaned cleaned_telegram_messagesCSV file
df_telegram_1 = pd.read_csv("cleaned_easybuyethiopia_messages.csv")  
print(df_telegram_1.shape,"\n")  # Display the size of the cleaned dataset

df_telegram_2 = pd.read_csv("cleaned_Ecommerceaddis_messages.csv")  
print(df_telegram_2.shape,"\n")  

df_telegram_3 = pd.read_csv("cleaned_jiji_shop_ethiopia_messages.csv")  
print(df_telegram_3.shape) 

(2714, 3) 

(4163, 3) 

(1232, 3)


In [15]:
print("Max user_id:", df_telegram_1["user_id"].max(), "Min user_id:", df_telegram_1["user_id"].min())
print("Max user_id:", df_telegram_2["user_id"].max(), "Min user_id:", df_telegram_2["user_id"].min())
print("Max user_id:", df_telegram_3["user_id"].max(), "Min user_id:", df_telegram_3["user_id"].min())
print("Max CustomerID:", df_ecom["CustomerID"].max(), "Min CustomerID:", df_ecom["CustomerID"].min())


Max user_id: -1001226361445 Min user_id: -1001226361445
Max user_id: -1001337500007 Min user_id: -1001337500007
Max user_id: -1001400160497 Min user_id: -1001400160497
Max CustomerID: 18287 Min CustomerID: 12346


In [16]:
# print the data type of the col
print(df_ecom.dtypes,"\n")
print(df_telegram_1.dtypes,"\n")
print(df_telegram_2.dtypes,"\n")
print(df_telegram_3.dtypes)



InvoiceNo       object
StockCode       object
Description     object
Quantity         int64
InvoiceDate     object
UnitPrice      float64
CustomerID       int64
Country         object
dtype: object 

date       object
message    object
user_id     int64
dtype: object 

date       object
message    object
user_id     int64
dtype: object 

date       object
message    object
user_id     int64
dtype: object


In [27]:
#Connect to PostgreSQL
conn = psycopg2.connect(
    dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT
)
cursor = conn.cursor()

In [28]:
try:
    # Drop the table if it exists and recreate it because i face problems with this database table
    cursor.execute("""
        DROP TABLE IF EXISTS Rental_Dataset;
    """)
    cursor.execute("""
        CREATE TABLE Rental_Dataset (
            order_id SERIAL PRIMARY KEY,
            InvoiceNo VARCHAR(50),
            StockCode VARCHAR(20),
            Description TEXT,
            Quantity INTEGER,
            InvoiceDate TIMESTAMP,
            UnitPrice DECIMAL(10, 2),
            CustomerID BIGINT,
            Country VARCHAR(100)
        )
    """)

    cursor.execute("""
        CREATE TABLE IF NOT EXISTS telegram_messages (
            message_id SERIAL PRIMARY KEY,
            date TIMESTAMP,
            message TEXT,
            user_id TEXT
        )
    """)

    conn.commit()
    print("Tables created successfully")

except Exception as e:
    conn.rollback()  # Rollback the transaction if there is an error
    print("Error creating tables:", e)


Tables created successfully


In [33]:
# ---- Insert Retail Data into PostgreSQL ----
try:
    df_ecom = pd.read_csv("cleaned_Online_Rental_data.csv")
    print(f"Loaded Retail Data: {df_ecom.shape}")

    Retail_data = list(df_ecom.itertuples(index=False, name=None))

    cursor.executemany("""
        INSERT INTO Rental_Dataset (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """, Retail_data)

    conn.commit()
    print(f"Successfully inserted {len(Retail_data)} records into Rental_Dataset.")

except Exception as e:
    conn.rollback()
    print(f"Error inserting Retail data: {e}")

# ---- Insert Telegram Data into PostgreSQL ----
csv_files = [
    "cleaned_easybuyethiopia_messages.csv",
    "cleaned_Ecommerceaddis_messages.csv",
    "cleaned_jiji_shop_ethiopia_messages.csv"
]

for file in csv_files:
    try:
        df_telegram = pd.read_csv(file)
        print(f"Processing {file}: Shape -> {df_telegram.shape}")

        # Ensure correct data types
        df_telegram["user_id"] = df_telegram["user_id"].astype(str)  # Store as string
        df_telegram["date"] = pd.to_datetime(df_telegram["date"], errors="coerce")

        # Drop rows with missing required fields
        df_telegram.dropna(subset=["date", "message", "user_id"], inplace=True)

        # Convert DataFrame to list of tuples for bulk insertion
        telegram_data = list(df_telegram[["date", "message", "user_id"]].itertuples(index=False, name=None))

        if telegram_data:  # Insert only if there's data
            cursor.executemany("""
                INSERT INTO telegram_messages (date, message, user_id)
                VALUES (%s, %s, %s)   
            """, telegram_data)
            conn.commit()
            print(f"Successfully inserted {len(telegram_data)} records from {file} into PostgreSQL.")
        else:
            print(f"No valid data to insert from {file}.")

    except Exception as e:
        conn.rollback()
        print(f"Error processing {file}: {e}")

conn.commit()
print("Data insertion completed successfully.")

Loaded Retail Data: (738757, 8)
Successfully inserted 738757 records into Rental_Dataset.
Processing cleaned_easybuyethiopia_messages.csv: Shape -> (2714, 3)
Successfully inserted 2714 records from cleaned_easybuyethiopia_messages.csv into PostgreSQL.
Processing cleaned_Ecommerceaddis_messages.csv: Shape -> (4163, 3)
Successfully inserted 4163 records from cleaned_Ecommerceaddis_messages.csv into PostgreSQL.
Processing cleaned_jiji_shop_ethiopia_messages.csv: Shape -> (1232, 3)
Successfully inserted 1232 records from cleaned_jiji_shop_ethiopia_messages.csv into PostgreSQL.
Data insertion completed successfully.


In [68]:
# Function to fetch data from PostgreSQL
def fetch_data(query):
    try:
        cursor.execute(query)
        data = cursor.fetchall()
        col_names = [desc[0] for desc in cursor.description]  # Get column names
        return pd.DataFrame(data, columns=col_names)
    except Exception as e:
        print(f"Error executing query: {e}")
        return None

# Retrieve The First 10 Data from Rental_Dataset  database
query_retail = "SELECT * FROM Rental_Dataset LIMIT 10;"  # Modify as needed
df_retail = fetch_data(query_retail)
print("\n Retail Data (Sample):")
print(df_retail)

# Retrieve The First 10 Messages from telegram_messages database
query_telegram = "SELECT * FROM telegram_messages LIMIT 10;"  # Modify as needed
df_telegram = fetch_data(query_telegram)
print("\n telegram Messages (Sample):")
print(df_telegram)


# Close the connection
if cursor:
    cursor.close()
if conn:
    conn.close()
print("✅ PostgreSQL connection closed.")



 Retail Data (Sample):
   order_id invoiceno stockcode                          description  \
0         1    536365    85123A   WHITE HANGING HEART T-LIGHT HOLDER   
1         2    536365     71053                  WHITE METAL LANTERN   
2         3    536365    84406B       CREAM CUPID HEARTS COAT HANGER   
3         4    536365    84029G  KNITTED UNION FLAG HOT WATER BOTTLE   
4         5    536365    84029E       RED WOOLLY HOTTIE WHITE HEART.   
5         6    536365     22752         SET 7 BABUSHKA NESTING BOXES   
6         7    536365     21730    GLASS STAR FROSTED T-LIGHT HOLDER   
7         8    536366     22633               HAND WARMER UNION JACK   
8         9    536366     22632            HAND WARMER RED POLKA DOT   
9        10    536367     84879        ASSORTED COLOUR BIRD ORNAMENT   

   quantity         invoicedate unitprice  customerid         country  
0         6 2010-12-01 08:26:00      2.55       17850  United Kingdom  
1         6 2010-12-01 08:26:00      3.