In [55]:
import boto3
from datetime import datetime as dt 
from datetime import timedelta as td
import time
import requests
import pandas as pd

In [56]:
dynamo_client = boto3.client('dynamodb')
dynamo_resource = boto3.resource('dynamodb')
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
job_details_table = dynamo_resource.Table('jobs')
job_run_details_table = dynamo_resource.Table('job_run_details')

In [57]:
job_details_table.delete_item(Key = {'job_id':'ghactivity_ingest'})
item = {
    'job_id': 'ghactivity_ingest',
    'job_description': 'Ingest ghactivity data to s3',
    'is_active': 'Y',
    'baseline_days': 3
    }
job_details_table.put_item(Item=item)

{'ResponseMetadata': {'RequestId': 'P6C3E2KG88Q7FTOGB2H7R75CCVVV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Wed, 27 Sep 2023 02:26:19 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'P6C3E2KG88Q7FTOGB2H7R75CCVVV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '2745614147'},
  'RetryAttempts': 0}}

In [61]:
def get_job_details(job_name, job_details_table):
    #job_details_table = dynamo_resource.Table('jobs')
    job_details_table = job_details_table.get_item(Key={'job_id':job_name})['Item']
    return job_details_table

In [86]:

def get_next_file(job_details):
    job_start_time = int(time.mktime(dt.now().timetuple()))
    job_run_bookmark_details = job_details.get('job_run_bookmark_details')
    baseline_days = int(job_details['baseline_days'])
    if job_run_bookmark_details:
        dt_part = job_run_bookmark_details['last_run_file_name'].split('.')[0].split('/')[-1]
        #dt_part = file_name.split('/')[-1]
        next_file_name = f'{dt.strftime(dt.strptime(dt_part, "%Y-%m-%d-%H") + td(hours=1), "%Y-%m-%d-%-H")}.json.gz' 
    else:
        next_file_name = f'{dt.strftime(dt.now().date() - td(days=baseline_days), format="%Y-%m-%d")}-0.json.gz'

    return job_start_time, next_file_name


In [92]:
job_details = get_job_details('ghactivity_ingest', job_details_table)
job_details

{'job_description': 'Ingest ghactivity data to s3',
 'is_active': 'Y',
 'job_id': 'ghactivity_ingest',
 'baseline_days': Decimal('3'),
 'job_run_bookmark_details': {'last_run_file_name': 's3://github-activity/landing/ghactivity//2023-09-24-0.json.gz',
  'status_code': Decimal('200')}}

In [93]:
job_start_time, next_file = get_next_file(job_details)
job_start_time, next_file

(1695782738, '2023-09-24-1.json.gz')

In [85]:
res = requests.get('https://data.gharchive.org/2023-09-24-1')
res.content

b"<?xml version='1.0' encoding='UTF-8'?><Error><Code>NoSuchKey</Code><Message>The specified key does not exist.</Message></Error>"

In [94]:
#https://data.gharchive.org/2023-09-24-0.json.gz
bucket_name =  'github-activity'
folder = f'landing/ghactivity/'
base_url= 'https://data.gharchive.org/'

def upload_file_to_s3(file_name, bucket_name, folder):
    print(f'Getting the {file_name} from gharchive')
    
    res = requests.get(base_url+file_name)
    print(f'Uploading {file_name} to s3 under s3://{bucket_name}/{folder}')
    if res.status_code == 200:

        upload_res = s3_client.put_object(
            Bucket = bucket_name,
            Key = folder+file_name,
            Body = res.content 
        )

        return {
            'last_run_file_name': f's3://{bucket_name}/{folder}/{file_name}',
            'status_code': upload_res['ResponseMetadata']['HTTPStatusCode']
        }

In [95]:
job_run_details = upload_file_to_s3(next_file, bucket_name, folder)
job_run_details

Getting the 2023-09-24-1.json.gz from gharchive
Uploading 2023-09-24-1.json.gz to s3 under s3://github-activity/landing/ghactivity/


{'last_run_file_name': 's3://github-activity/landing/ghactivity//2023-09-24-1.json.gz',
 'status_code': 200}

In [96]:
def save_job_run_details(job_details, job_run_details, job_start_time, job_run_details_table, job_details_table):
    job_run_details_item = {
        'job_id': job_details['job_id'],
        'job_run_time': job_start_time,
        'job_run_bookmark_details': job_run_details,
        'create_ts': int(time.mktime(dt.now().timetuple()))
    }
    #job_run_details_table = dynamo_resource.Table('job_run_details')
    job_run_details_table.put_item(Item=job_run_details_item)

    #job_details_table = dynamo_resource.Table('jobs')
    job_details['job_run_bookmark_details'] = job_run_details
    job_details_table.put_item(Item=job_details)

In [97]:
save_job_run_details(job_details, job_run_details, job_start_time, job_run_details_table, job_details_table)

In [None]:
s3_file_path= 's3://github-activity/landing/ghactivity/2023-09-24-0.json.gz'
df = pd.read_json(
    s3_file_path,
    lines=True,
    orient='records'
    )

df.head(2)

In [None]:
df.shape