In [1]:
import requests
import pandas as pd
import boto3
from datetime import datetime as dt
from datetime import timedelta as td
import time



In [2]:

dynamodb = boto3.resource('dynamodb')
table= dynamodb.Table('jobs')
table.delete_item(Key={'job_id':'ghactivity_ingest'})
item = {
     'job_id': 'ghactivity_ingest',
     'job_description': 'Ingest ghactivity data to s3',
     'is_active': 'Y',
     'baseline_days': 3,
}

table.put_item(Item=item)

    
      

{'ResponseMetadata': {'RequestId': 'IV7BCA3GQOPL438OCL7AHKRNN7VV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Sat, 19 Aug 2023 07:44:48 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'IV7BCA3GQOPL438OCL7AHKRNN7VV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '2745614147'},
  'RetryAttempts': 0}}

In [13]:
def get_job_details(job_name):
    dynamodb = boto3.resource('dynamodb')
    table= dynamodb.Table('jobs')
    job_details= table.get_item(Key={'job_id':job_name})['Item']
    return job_details
      

In [11]:
def get_next_file_name(job_details):
    """Get the next file name for a job"""
    job_start_time = int(time.mktime(dt.now().timetuple()))
    job_run_bookmark_details = job_details.get('job_run_bookmark_details')
    baseline_days= int(job_details['baseline_days'])
    if job_run_bookmark_details:
        dt_part = job_run_bookmark_details['last_run_file_name'].split('.')[0]
        next_file_name = f"{dt.strftime(dt.strptime(dt_part,'%Y-%m-%d-H')+ td(hour=2),'%Y-%m-%d-%-H')}.json.gz"
    else:
        next_file_name= f"{dt.strftime(dt.now().date()- td(days=baseline_days),'%Y-%m-%d')}-0.json.gz"
    
    return job_start_time,next_file_name


In [15]:
job_details = get_job_details('ghactivity_ingest')
job_details

{'job_description': 'Ingest ghactivity data to s3',
 'is_active': 'Y',
 'job_id': 'ghactivity_ingest',
 'baseline_days': Decimal('3')}

In [16]:
job_start_time,next_file_name = get_next_file_name(job_details)

print(job_start_time,next_file_name)

1692518480 2023-08-17-0.json.gz


In [17]:
bucket_name = 'aigithubs'
folder = 'landing/ghactivity'

In [18]:

def upload_file_to_s3(file_name,bucket_name,folder):
    print(f'Getting the {file_name} from gharchive')
    res = requests.get(f'https://data.gharchive.org/{file_name}')
    print(f'Uploading {file_name} to s3 under s3://{bucket_name}/{folder}')
    s3_client = boto3.client('s3')
    upload_res = s3_client.put_object(
        Bucket=bucket_name,
        Key=f'landing/ghactivity/{file_name}',
        Body=res.content
    )
    return {'last_run_file_name':f's3://{bucket_name}/{folder}/{file_name}',
            'status_code':upload_res['ResponseMetadata']['HTTPStatusCode']
            }
    
    
    

In [21]:
job_run_details = upload_file_to_s3(next_file_name,bucket_name,folder)

Getting the 2023-08-17-0.json.gz from gharchive
Uploading 2023-08-17-0.json.gz to s3 under s3://aigithubs/landing/ghactivity


In [32]:
print(job_run_details)
print(job_details)

{'last_run_file_name': 's3://aigithubs/landing/ghactivity/2023-08-17-0.json.gz', 'status_code': 200}
{'job_description': 'Ingest ghactivity data to s3', 'is_active': 'Y', 'job_id': 'ghactivity_ingest', 'baseline_days': Decimal('3')}


In [38]:
def save_job_run_details(job_details,job_run_details,job_start_time):
    dynamodb = boto3.resource('dynamodb')
    job_run_details_item={
        'job_id':job_details['job_id'],
        'job_run_time':job_start_time,
        'job_run_bookmark_details': job_run_details,
        'create_ts':int(time.mktime(dt.now().timetuple()))
    }
    job_run_details_table = dynamodb.Table('jobs_run_details')
    job_run_details_table.put_item(Item=job_run_details_item)
    
    job_details_tale = dynamodb.Table('jobs')
    job_details['job_run_bookmark_details']=job_run_details
    job_details_tale.put_item(Item=job_details)
    

In [39]:
save_job_run_details(job_details,job_run_details,job_start_time)

In [30]:
!aws s3 ls s3://{bucket_name}/{folder} --recursive

2023-08-18 16:35:01   73291345 landing/ghactivity/2023-08-15-0.json.gz
2023-08-19 21:22:23   75643003 landing/ghactivity/2023-08-16-0.json.gz
2023-08-20 13:32:11   64427174 landing/ghactivity/2023-08-17-0.json.gz


In [2]:
dynamodb= boto3.resource('dynamodb')
itr = dynamodb.tables.iterator()

for table in itr:
    print(table)

dynamodb.Table(name='jobs')
dynamodb.Table(name='jobs_run_details')


In [40]:
job_details = dynamodb.Table('jobs_run_details')
job_details.scan()

{'Items': [{'job_id': 'ghactivity_ingest',
   'job_run_bookmark_details': {'processed_file_name': '2022-06-03-0.json.gz'},
   'job_run_time': Decimal('1691673936')},
  {'create_ts': Decimal('1692519331'),
   'job_id': 'ghactivity_ingest',
   'job_run_bookmark_details': {'last_run_file_name': 's3://aigithubs/landing/ghactivity/2023-08-17-0.json.gz',
    'status_code': Decimal('200')},
   'job_run_time': Decimal('1692518480')}],
 'Count': 2,
 'ScannedCount': 2,
 'ResponseMetadata': {'RequestId': '3I7OEMGFI0ICF7TF7GEPIRV5RRVV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Sun, 20 Aug 2023 08:16:02 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '442',
   'connection': 'keep-alive',
   'x-amzn-requestid': '3I7OEMGFI0ICF7TF7GEPIRV5RRVV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '192058382'},
  'RetryAttempts': 0}}