# Pipeline

## Installing the dependencies

In [24]:
!pip install psycopg2-binary pandas pymongo



## Connect to MongoDB
Connect to the NoSQL database and retreive raw dataset.

In [25]:
import pymongo
import pandas as pd

# MongoDB connection details
mongo_uri = "mongodb://mongo:27017"
database_name = "airflow"
collection_name = "raw_memes"

# Retreive dataset
client = pymongo.MongoClient(mongo_uri)
db = client[database_name]
collection = db[collection_name]

data = list(collection.find())
dataframe = pd.DataFrame(data)

client.close()

## Select columns

In [26]:
dataframe.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 28799 entries, 0 to 28798
Data columns (total 17 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   _id                    28799 non-null  object 
 1   title                  28799 non-null  object 
 2   url                    28799 non-null  object 
 3   last_update_source     28799 non-null  int64  
 4   category               28799 non-null  object 
 5   template_image_url     28799 non-null  object 
 6   meta                   28799 non-null  object 
 7   ld                     28798 non-null  object 
 8   added                  28606 non-null  float64
 9   details                28799 non-null  object 
 10  content                15406 non-null  object 
 11  tags                   15406 non-null  object 
 12  additional_references  15406 non-null  object 
 13  search_keywords        19539 non-null  object 
 14  parent                 12203 non-null  object 
 15  si

In [27]:
export = dataframe[['_id', 'title', 'url']]
export[['_id', 'title', 'url']] = export[['_id', 'title', 'url']].astype(str)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  export[['_id', 'title', 'url']] = export[['_id', 'title', 'url']].astype(str)


## Insert cleaned data into PostgreSQL database

In [28]:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy.sql import text

def pandas_to_sqlalchemy_type(pandas_dtype):
    """
    Map Pandas DataFrame data types to SQLAlchemy data types.

    Args:
        pandas_dtype (str): Pandas DataFrame data type.

    Returns:
        sqlalchemy_type: Corresponding SQLAlchemy data type.
    """
    pandas_dtype = pandas_dtype.lower()

    if pandas_dtype == 'int64':
        return Integer
    elif pandas_dtype == 'float64':
        return Float
    elif pandas_dtype == 'object':
        return String
    elif pandas_dtype == 'bool':
        return Boolean
    elif pandas_dtype == 'datetime64':
        return DateTime
    else:
        # Add additional mappings for more data types as needed
        return String  # Default to String if no matching type is found

# Connect to PostgreSQL server
engine = create_engine('postgresql+psycopg2://airflow:airflow@postgres', pool_recycle=3600);
connection = engine.connect();

# Define your table structure
metadata = MetaData()

table_name = 'cleaned_memes'

# Create the table in the database
metadata.create_all(engine)

table = Table(
    table_name,
    metadata,
    *(
        Column(
            column, 
            pandas_to_sqlalchemy_type(str(export[column].dtype)), 
            primary_key=column=='_id'
        ) for column in export
    )
)

# Insert all records of export dataframe
export.to_sql(table_name, engine, if_exists='replace', index=False)

print("Table created and data inserted successfully.")

Table created and data inserted successfully.
