In [None]:
!pip install pandas==1.5.*

In [1]:
%%timeit
import boto3
import os


BUCKET_NAME = 'iot-workshop-publish-ip'
TOPIC = 'topic/publish_ip'


s3 = boto3.resource('s3')


def download_s3_folder(bucket_name: str, s3_folder: str, local_dir: str = None) -> None:
    """
    Download the contents of a folder directory
    Args:
        bucket_name: the name of the s3 bucket
        s3_folder: the folder path in the s3 bucket
        local_dir: a relative or absolute directory path in the local file system
    
    Returns:
        Nothing, downloads files in the S3 bucket into `local_dir`.
    """
    bucket = s3.Bucket(bucket_name)

    for obj in bucket.objects.filter(Prefix=s3_folder):
        target = obj.key if local_dir is None \
            else os.path.join(local_dir, os.path.relpath(obj.key, s3_folder))
        if not os.path.exists(os.path.dirname(target)):
            os.makedirs(os.path.dirname(target))
        if obj.key[-1] == '/':
            continue
        bucket.download_file(obj.key, target)
#         print(f'Downloaded {obj}.')

download_s3_folder(
    bucket_name=BUCKET_NAME,
    s3_folder=TOPIC
)

KeyboardInterrupt: 

In [None]:
import pandas as pd
import os

# Read the files in the directory
path = './topic/publish_ip/'
files = os.listdir(path)

# Read json content into pandas series
dfs = [pd.read_json(f'{path}{f}', typ='series') for f in files]

# Concatenate series and turn into pandas dataframe
df = pd.concat(dfs, axis=1).T


# Processing
def convert_to_timestamp(df: pd.DataFrame, col: str) -> pd.DataFrame:
    """Converts column `col` of `df` to timestamp inferring the datetime format.
    """
    df[col] = pd.to_datetime(df[col], infer_datetime_format=True)
    return df

def get_device_id(df: pd.DataFrame, col: str) -> pd.DataFrame:
    """Extracts the device id in the username column `col` of `df`.
    """
    df['device'] = df[col].str.extract(r'.+(\d)')
    return df

def keep_latest_timestamp(
    df: pd.DataFrame, timestamp_col: str, keep_col: str, orderby_col: str
) -> pd.DataFrame:
    """Keeps the last subset of `df` based on the `keep_col` ordered by the `timestamp_col`.
    Returned dataframe is ordered by the `orderby_col`.
    """
    df = (
        df
        .sort_values(by=timestamp_col)
        .drop_duplicates(subset=[keep_col], keep="last")
        .sort_values(by=['device'])
    )
    
    return df


df = (
    df
    .pipe(func=convert_to_timestamp, col='Timestamp')
    .pipe(func=get_device_id, col='username')
    .pipe(func=keep_latest_timestamp, timestamp_col='Timestamp', keep_col='username', orderby_col='device')
)

df

Unnamed: 0,Timestamp,username,ip
53,2022-10-10 08:51:06,done2,192.168.1.115
59,2022-10-12 03:41:05,done3,192.168.1.117
