# Example Code for a Data Pipeline: RFID Card/Tag Scans

In [10]:
import pandas as pd
from datetime import datetime
#from airflow.models import DAG
#from airflow.operators.python_operator import PythonOperator
import sqlalchemy


## Import, Sense-Check and Clean Datasets

In [11]:
# Import the two csv files containing the data from the RFID reads and the user ID information:

cardreads = pd.read_csv('card_reads.csv')
usernames = pd.read_csv('usernames.csv')

In [12]:
print(cardreads)
print(usernames)

   RFID NUMBER  YEAR  MONTH  DAY  HOUR  MINUTE  SECOND
0   1708320199  2025      3   29    17      24      26
1   1708320199  2025      3   29    17      24      38
2   3825984899  2025      3   29    17      24      46
3   1708320199  2025      3   29    17      24      59
4   3825984899  2025      3   29    17      25      10
5   1708320199  2025      3   29    17      25      23
6   1708320199  2025      3   29    17      25      33
7   3825984899  2025      3   29    17      25      43
      Card ID             User
0  1708320199  Claire Lawrence
1  3825984899       John Smith


In [13]:
cardreads.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8 entries, 0 to 7
Data columns (total 7 columns):
 #   Column       Non-Null Count  Dtype
---  ------       --------------  -----
 0   RFID NUMBER  8 non-null      int64
 1   YEAR         8 non-null      int64
 2   MONTH        8 non-null      int64
 3   DAY          8 non-null      int64
 4   HOUR         8 non-null      int64
 5   MINUTE       8 non-null      int64
 6   SECOND       8 non-null      int64
dtypes: int64(7)
memory usage: 580.0 bytes


In [14]:
usernames.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2 entries, 0 to 1
Data columns (total 2 columns):
 #   Column   Non-Null Count  Dtype 
---  ------   --------------  ----- 
 0   Card ID  2 non-null      int64 
 1   User     2 non-null      object
dtypes: int64(1), object(1)
memory usage: 164.0+ bytes


In [15]:
# Rename Card ID to RFID NUMBER to match in both tables:

usernames.rename(columns={'Card ID':'RFID NUMBER'}, inplace=True)

usernames


Unnamed: 0,RFID NUMBER,User
0,1708320199,Claire Lawrence
1,3825984899,John Smith


In [16]:
# Join the two dataframes together to match the RFID scans with the users' names:

alldata = pd.merge(cardreads, usernames, how='inner', on='RFID NUMBER')

alldata

Unnamed: 0,RFID NUMBER,YEAR,MONTH,DAY,HOUR,MINUTE,SECOND,User
0,1708320199,2025,3,29,17,24,26,Claire Lawrence
1,1708320199,2025,3,29,17,24,38,Claire Lawrence
2,3825984899,2025,3,29,17,24,46,John Smith
3,1708320199,2025,3,29,17,24,59,Claire Lawrence
4,3825984899,2025,3,29,17,25,10,John Smith
5,1708320199,2025,3,29,17,25,23,Claire Lawrence
6,1708320199,2025,3,29,17,25,33,Claire Lawrence
7,3825984899,2025,3,29,17,25,43,John Smith


In [17]:
alldata.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8 entries, 0 to 7
Data columns (total 8 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   RFID NUMBER  8 non-null      int64 
 1   YEAR         8 non-null      int64 
 2   MONTH        8 non-null      int64 
 3   DAY          8 non-null      int64 
 4   HOUR         8 non-null      int64 
 5   MINUTE       8 non-null      int64 
 6   SECOND       8 non-null      int64 
 7   User         8 non-null      object
dtypes: int64(7), object(1)
memory usage: 644.0+ bytes


In [18]:
# Change all time and date columns to string (so that they can be made into usable date and time formats/columns):

alldata['YEAR'] = alldata.YEAR.astype(str)
alldata['MONTH'] = alldata.MONTH.astype(str)
alldata['DAY'] = alldata.DAY.astype(str)
alldata['HOUR'] = alldata.HOUR.astype(str)
alldata['MINUTE'] = alldata.MINUTE.astype(str)
alldata['SECOND'] = alldata.SECOND.astype(str)

In [19]:
alldata.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8 entries, 0 to 7
Data columns (total 8 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   RFID NUMBER  8 non-null      int64 
 1   YEAR         8 non-null      object
 2   MONTH        8 non-null      object
 3   DAY          8 non-null      object
 4   HOUR         8 non-null      object
 5   MINUTE       8 non-null      object
 6   SECOND       8 non-null      object
 7   User         8 non-null      object
dtypes: int64(1), object(7)
memory usage: 644.0+ bytes


In [20]:
# Join data together into new columns:

alldata['Date'] = alldata['DAY'] + "/" + alldata['MONTH'] + "/" + alldata['YEAR']
alldata['Time'] = alldata['HOUR'] + ":" + alldata['MINUTE'] + ":" + alldata['SECOND']

alldata

Unnamed: 0,RFID NUMBER,YEAR,MONTH,DAY,HOUR,MINUTE,SECOND,User,Date,Time
0,1708320199,2025,3,29,17,24,26,Claire Lawrence,29/3/2025,17:24:26
1,1708320199,2025,3,29,17,24,38,Claire Lawrence,29/3/2025,17:24:38
2,3825984899,2025,3,29,17,24,46,John Smith,29/3/2025,17:24:46
3,1708320199,2025,3,29,17,24,59,Claire Lawrence,29/3/2025,17:24:59
4,3825984899,2025,3,29,17,25,10,John Smith,29/3/2025,17:25:10
5,1708320199,2025,3,29,17,25,23,Claire Lawrence,29/3/2025,17:25:23
6,1708320199,2025,3,29,17,25,33,Claire Lawrence,29/3/2025,17:25:33
7,3825984899,2025,3,29,17,25,43,John Smith,29/3/2025,17:25:43


In [21]:
# Date and Time columns are put into date and time formats:

alldata['Date'] = alldata['Date'].astype('datetime64[ns]')
alldata['Time'] = alldata['Time'].astype('datetime64[ns]')
alldata['Time'] = alldata['Time'].dt.strftime("%H:%M:%S")

alldata


Unnamed: 0,RFID NUMBER,YEAR,MONTH,DAY,HOUR,MINUTE,SECOND,User,Date,Time
0,1708320199,2025,3,29,17,24,26,Claire Lawrence,2025-03-29,17:24:26
1,1708320199,2025,3,29,17,24,38,Claire Lawrence,2025-03-29,17:24:38
2,3825984899,2025,3,29,17,24,46,John Smith,2025-03-29,17:24:46
3,1708320199,2025,3,29,17,24,59,Claire Lawrence,2025-03-29,17:24:59
4,3825984899,2025,3,29,17,25,10,John Smith,2025-03-29,17:25:10
5,1708320199,2025,3,29,17,25,23,Claire Lawrence,2025-03-29,17:25:23
6,1708320199,2025,3,29,17,25,33,Claire Lawrence,2025-03-29,17:25:33
7,3825984899,2025,3,29,17,25,43,John Smith,2025-03-29,17:25:43


In [None]:
alldata.info()

Having assessed and cleaned up the data, the below sets up the pipeline.  It would be put into its own .py file, which would be imported eventually into Apache Airflow to regularly perform an ETL function and update a SQL table with the latest, clean data in an analytical database.

## Set Up: Create Functions to Extract, Transform and Load Data

In [2]:
# No function is required to import the csv files in this case, as they are just csv files imported from the folder:

cardreads_df = pd.read_csv('card_reads.csv')

usernames_df = pd.read_csv('usernames.csv')


In [3]:
# A function to tranform the data (as done previously):

def transform_dfs(cardreads_df, usernames_df):
    usernames_df.rename(columns={'Card ID':'RFID NUMBER'}, inplace=True)
    clean_df = pd.merge(cardreads_df, usernames_df, how='inner', on='RFID NUMBER')
    clean_df['YEAR'] = clean_df.YEAR.astype(str)
    clean_df['MONTH'] = clean_df.MONTH.astype(str)
    clean_df['DAY'] = clean_df.DAY.astype(str)
    clean_df['HOUR'] = clean_df.HOUR.astype(str)
    clean_df['MINUTE'] = clean_df.MINUTE.astype(str)
    clean_df['SECOND'] = clean_df.SECOND.astype(str)
    clean_df['Date'] = clean_df['DAY'] + "/" + clean_df['MONTH'] + "/" + clean_df['YEAR']
    clean_df['Time'] = clean_df['HOUR'] + ":" + clean_df['MINUTE'] + ":" + clean_df['SECOND']
    clean_df['Date'] = clean_df['Date'].astype('datetime64[ns]')
    clean_df['Time'] = clean_df['Time'].astype('datetime64[ns]')
    clean_df['Time'] = clean_df['Time'].dt.strftime("%H:%M:%S")
    return clean_df

In [None]:
# Set up a connection to a PostgreSQL database and create a function to load the transformed data into the database:

connection_uri = "postgresql://user:password@localhost:5432/rfid"
db_engine = sqlalchemy.create_engine(connection_uri)

def load_data_to_sql(alldata):
    return alldata.to_sql("rfid_hits", db_engine, schema="rfid_hits", if_exists="append")

## Create an ETL Function and Directed Acyclic Graph (DAG) to Schedule with Apache Airflow

In [None]:
def etl():
    #Extract:
    cardreads_df = pd.read_csv('card_reads.csv')
    usernames_df = pd.read_csv('usernames.csv')
    #Transform:
    alldata = transform_dfs(cardreads_df, usernames_df)
    #Load:
    load_data_to_sql(alldata) 

In [None]:
# Define the DAG to use with Apache Airflow.
# This is so the pipeline runs on a daily basis.
# The schedule interval is set to run every day at midnight to update the data (using a cron expression):
dag = DAG(dag_id="rfid",
          schedule_interval="0 0 * * *")

# The PythonOperator is used to run the etl function:
etl_task = PythonOperator(
    task_id="rfid_etl",
    python_callable=etl,
    dag=dag)

The DAG is then put into a .py file and placed in the ~airflow/dags/ folder for scheduling in Airflow.

In [9]:
!jupyter nbconvert --to webpdf --allow-chromium-download rfid_data_engineering_project.ipynb

[NbConvertApp] Converting notebook rfid_data_engineering_project.ipynb to webpdf
[NbConvertApp] Building PDF
You are using a frozen ffmpeg browser which does not receive updates anymore on mac12. Please update to the latest version of your operating system to test up-to-date browsers.
[NbConvertApp] PDF successfully created
[NbConvertApp] Writing 171171 bytes to rfid_data_engineering_project.pdf
