# Pipeline

## Installing the dependencies

In [2]:
!pip install psycopg2-binary pandas pymongo

Collecting psycopg2-binary
  Obtaining dependency information for psycopg2-binary from https://files.pythonhosted.org/packages/ce/85/62825cabc6aad53104b7b6d12eb2ad74737d268630032d07b74d4444cb72/psycopg2_binary-2.9.9-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading psycopg2_binary-2.9.9-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.4 kB)
Collecting pandas
  Obtaining dependency information for pandas from https://files.pythonhosted.org/packages/08/de/d4448c423484537ebc9373d3da2496a2e47f42ea11ff48e025cf49665471/pandas-2.1.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading pandas-2.1.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (18 kB)
Collecting pymongo
  Obtaining dependency information for pymongo from https://files.pythonhosted.org/packages/24/cb/c1824d7c5946c7750a4ce3e2b118b03b88975915f1d060f1f3ec5d9f49d7/pymongo-4.6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86

## Connect to MongoDB
Connect to the NoSQL database and retreive raw dataset.

In [1]:
import pymongo
import pandas as pd

import json, csv
from csv import writer, reader

# MongoDB connection details
mongo_uri = "mongodb://mongo:27017"
database_name = "memes"
collection_name = "raw_memes"

# Retreive dataset
client = pymongo.MongoClient(mongo_uri)
db = client[database_name]
collection = db[collection_name]

data = list(collection.find())
dataframe = pd.DataFrame(data)

client.close()

ModuleNotFoundError: No module named 'pymongo'

In [4]:
def save2json(filename, dump):
    out_file = open(filename, "w")
    json.dump(dump, out_file, indent = 6)
    out_file.close()

## Select columns

In [20]:
if '_id' in dataframe.columns:
    dataframe.drop('_id', axis=1, inplace=True)

dataframe.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 28799 entries, 0 to 28798
Data columns (total 16 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   title                  28799 non-null  object 
 1   url                    28799 non-null  object 
 2   last_update_source     28799 non-null  int64  
 3   category               28799 non-null  object 
 4   template_image_url     28799 non-null  object 
 5   meta                   28799 non-null  object 
 6   ld                     28798 non-null  object 
 7   added                  28606 non-null  float64
 8   details                28799 non-null  object 
 9   content                15406 non-null  object 
 10  tags                   15406 non-null  object 
 11  additional_references  15406 non-null  object 
 12  search_keywords        19539 non-null  object 
 13  parent                 12203 non-null  object 
 14  siblings               12203 non-null  object 
 15  ch

In [21]:
raw_media_frames = []
metas = []
lds = []

In [22]:
for index, row in dataframe.iterrows():
    m = row.to_dict()
    
    if ("sites" in m['url'] or "culture" in m['url'] or "subculture" in m['url'] or "event" in m['url'] or "people" in m['url'] or "type" in m['url']  ):
        continue
    if 'content' in m and 'about' in m['content'] and 'text' in m['content']['about']:
        m['content']['about']['fulltext'] = "".join(m['content']['about']['text'])
    if 'content' in m and 'origin' in m['content'] and 'text' in m['content']['origin']:
        m['content']['origin']['fulltext'] = "".join(m['content']['origin']['text'])
    if 'content' in m and 'spread' in m['content'] and 'text' in m['content']['spread']:
        m['content']['spread']['fulltext'] = "".join(m['content']['spread']['text'])
    if 'content' in m and 'subsection' in m['content'] and 'text' in m['content']['subsection']:
        m['content']['subsection']['fulltext'] = "".join(m['content']['subsection']['text'])
    if 'meta' in m:
        metas.append(m.pop('meta', None))
    if 'ld' in m:
        lds.append(m.pop('ld', None))
    
    raw_media_frames.append(m)


In [23]:
save2json('data/kym.media.frames.meta.json', metas)

In [24]:
save2json('data/kym.media.frames.ls.json', lds)

In [25]:
save2json('data/kym.media.frames.json', raw_media_frames)

In [39]:
data = [[m['title'], m['url']] for m in raw_media_frames]
df_raw_media_frame = pd.DataFrame(data, columns=['title', 'meme'])

df_raw_media_frame.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15298 entries, 0 to 15297
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   title   15298 non-null  object
 1   meme    15298 non-null  object
dtypes: object(2)
memory usage: 239.2+ KB


In [27]:
def extract_X(infile, x):
    xs = []
    for m in infile:
        if x in m.keys():
            if isinstance(m[x], list):
                for s in m[x]:
                    xs.append([m['url'], s])
            else:
                xs.append([m['url'], m[x]])

    df = pd.DataFrame(xs, columns=['meme', x])
    return df

In [29]:
df_siblings = extract_X(raw_media_frames, 'siblings')

df_siblings.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 296591 entries, 0 to 296590
Data columns (total 2 columns):
 #   Column    Non-Null Count   Dtype 
---  ------    --------------   ----- 
 0   meme      296591 non-null  object
 1   siblings  288424 non-null  object
dtypes: object(2)
memory usage: 4.5+ MB


In [30]:
df_parent = extract_X(raw_media_frames, 'parent')

df_parent.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15298 entries, 0 to 15297
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   meme    15298 non-null  object
 1   parent  7131 non-null   object
dtypes: object(2)
memory usage: 239.2+ KB


In [31]:
df_children = extract_X(raw_media_frames, 'children')

df_children.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 159684 entries, 0 to 159683
Data columns (total 2 columns):
 #   Column    Non-Null Count   Dtype 
---  ------    --------------   ----- 
 0   meme      159684 non-null  object
 1   children  147802 non-null  object
dtypes: object(2)
memory usage: 2.4+ MB


In [34]:
def extract_types(infile):
    xs = []
    for m in infile:
        if 'details' in m.keys() and 'type' in m['details'].keys():
            for x in m['details']['type']:
                xs.append([m['url'], x])

    df = pd.DataFrame(xs, columns=['meme', 'type'])    
    return df

In [37]:
df_types = extract_types(raw_media_frames)

df_types.info()

print(df_types)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7835 entries, 0 to 7834
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   meme    7835 non-null   object
 1   type    7835 non-null   object
dtypes: object(2)
memory usage: 122.6+ KB
                                                   meme  \
0             https://knowyourmeme.com/memes/roflcopter   
1      https://knowyourmeme.com/memes/bitches-dont-know   
2       https://knowyourmeme.com/memes/in-soviet-russia   
3                   https://knowyourmeme.com/memes/domo   
4         https://knowyourmeme.com/memes/i-like-turtles   
...                                                 ...   
7830        https://knowyourmeme.com/memes/image-macros   
7831  https://knowyourmeme.com/memes/bait-and-switch...   
7832  https://knowyourmeme.com/memes/bait-and-switch...   
7833  https://knowyourmeme.com/memes/bait-and-switch...   
7834      https://knowyourmeme.com/memes/nightmare-fuel   


In [4]:
export = dataframe[['_id', 'title', 'url']]
export[['_id', 'title', 'url']] = export[['_id', 'title', 'url']].astype(str)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  export[['_id', 'title', 'url']] = export[['_id', 'title', 'url']].astype(str)


## Insert cleaned data into PostgreSQL database

In [40]:
export = df_raw_media_frame
export = df_raw_media_frame.astype(str)

In [41]:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy.sql import text

def pandas_to_sqlalchemy_type(pandas_dtype):
    """
    Map Pandas DataFrame data types to SQLAlchemy data types.

    Args:
        pandas_dtype (str): Pandas DataFrame data type.

    Returns:
        sqlalchemy_type: Corresponding SQLAlchemy data type.
    """
    pandas_dtype = pandas_dtype.lower()

    if pandas_dtype == 'int64':
        return Integer
    elif pandas_dtype == 'float64':
        return Float
    elif pandas_dtype == 'object':
        return String
    elif pandas_dtype == 'bool':
        return Boolean
    elif pandas_dtype == 'datetime64':
        return DateTime
    else:
        # Add additional mappings for more data types as needed
        return String  # Default to String if no matching type is found

# Connect to PostgreSQL server
engine = create_engine('postgresql+psycopg2://airflow:airflow@postgres', pool_recycle=3600);
connection = engine.connect();

# Define your table structure
metadata = MetaData()

table_name = 'raw_media_meme'

# Create the table in the database
metadata.create_all(engine)

table = Table(
    table_name,
    metadata,
    *(
        Column(
            column, 
            pandas_to_sqlalchemy_type(str(export[column].dtype)), 
            primary_key=column=='_id'
        ) for column in export
    )
)

# Insert all records of export dataframe
export.to_sql(table_name, engine, if_exists='replace', index=False)

print("Table created and data inserted successfully.")

Table created and data inserted successfully.
