In [34]:
import boto3
import os
import pandas as pd

# AWS links  
- IAM: 
https://us-east-1.console.aws.amazon.com/iam/home?region=eu-central-1#/users/details/user1?section=permissions  
- S3:
https://eu-central-1.console.aws.amazon.com/s3/home?region=eu-central-1#  
- Firehose:
https://eu-central-1.console.aws.amazon.com/firehose/home?region=eu-central-1#/streams  
- Kinesis:
https://eu-central-1.console.aws.amazon.com/kinesis/home?region=eu-central-1#/dashboard  

In [2]:
AWS_KEY_ID = os.environ['AWS_KEY_ID']
AWS_SECRET = os.environ['AWS_SECRET']
# User: arn:aws:iam::226802162969:user/user1

# Managing Firehose delivery streams

In [3]:
firehose = boto3.client('firehose',
                        aws_access_key_id=AWS_KEY_ID, 
                        aws_secret_access_key=AWS_SECRET, 
                        region_name='eu-central-1') # , endpoint_url=endpoints['FIREHOSE'])
firehose

<botocore.client.Firehose at 0x1f8b088bd90>

In [5]:
# Get list of delivery streams
response = firehose.list_delivery_streams()
response

{'DeliveryStreamNames': [],
 'HasMoreDeliveryStreams': False,
 'ResponseMetadata': {'RequestId': 'e81e95cf-6cae-384e-88f7-1576f624543f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e81e95cf-6cae-384e-88f7-1576f624543f',
   'x-amz-id-2': 'hzdOEgMoGPt5qitAFiUJgIA5Pot5fenkMzRHAHYJlWhIJdVyYnz9n6Hc7xcfxdnvhZTTKwkdMv1clSuO98veK1m3l9Hvyymy',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '57',
   'date': 'Thu, 10 Jul 2025 08:48:57 GMT'},
  'RetryAttempts': 0}}

In [6]:
# Iterate over the response contents and delete every stream
for stream_name in response['DeliveryStreamNames']:
    firehose.delete_delivery_stream(DeliveryStreamName=stream_name)
    print(f"Deleted stream: {stream_name}")

# Print list of delivery streams
print(firehose.list_delivery_streams())

{'DeliveryStreamNames': [], 'HasMoreDeliveryStreams': False, 'ResponseMetadata': {'RequestId': 'e1cf7081-7246-30d3-8126-f03fe8cc5ca2', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e1cf7081-7246-30d3-8126-f03fe8cc5ca2', 'x-amz-id-2': '3jV6oHw5wy2d91g1fUla/yNFBdjGLtZkkbua11Is7O1PwD3XYm8ajeS1k1PCUFADuNMNAoEsfHIC54LF37Oe5McFJ8jZPd1X', 'content-type': 'application/x-amz-json-1.1', 'content-length': '57', 'date': 'Thu, 10 Jul 2025 08:49:02 GMT'}, 'RetryAttempts': 0}}


# Getting ready for the first stream with S3
- add the permission of AmazonKinesisFirehoseFullAccess to user1 by using 'Attach policies directly'
- for storing in place except us-east-1 we have to provide location

In [8]:
s3 = boto3.client('s3',
            aws_access_key_id=AWS_KEY_ID, 
            aws_secret_access_key=AWS_SECRET, 
            region_name='eu-central-1')
s3

<botocore.client.S3 at 0x1f8b15b1110>

## Creating roles
- Create role of AWS service for 'Firehose' usecase and adding AmazonS3FullAccess and name it 'firehoseDeliveryRole1'

## S3 bucket creation

In [13]:
# Create the new sd-vehicle-data bucket
s3.create_bucket(Bucket='adelramezani-vehicle-data',
                 CreateBucketConfiguration={'LocationConstraint': 'eu-central-1'}) 

{'ResponseMetadata': {'RequestId': 'M4DNRXW5C2SVZSXQ',
  'HostId': 'hgzPaH2Jzu84RMbb8wNdF9PWQ7tAOW/EPtmnbd3AMMUC+jYGUv0/jglQnxEH4ahXl8juyZv8pk2PrFacMvKF0g==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'hgzPaH2Jzu84RMbb8wNdF9PWQ7tAOW/EPtmnbd3AMMUC+jYGUv0/jglQnxEH4ahXl8juyZv8pk2PrFacMvKF0g==',
   'x-amz-request-id': 'M4DNRXW5C2SVZSXQ',
   'date': 'Thu, 10 Jul 2025 08:58:56 GMT',
   'location': 'http://adelramezani-vehicle-data.s3.amazonaws.com/',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'Location': 'http://adelramezani-vehicle-data.s3.amazonaws.com/'}

In [15]:
# List the buckets in S3
for bucket_info in s3.list_buckets()['Buckets']:
    
    # Get the bucket_name
    bucket_name = bucket_info['Name']
    
    # Generate bucket ARN.
    arn = "arn:aws:s3:::{}".format(bucket_name)
    
    # Print the ARN
    print(arn)

arn:aws:s3:::adelramezani-kaggle
arn:aws:s3:::adelramezani-vehicle-data
arn:aws:s3:::aws-glue-assets-226802162969-us-east-1


# Working with the Firehose delivery stream
- get the role firehoseDeliveryRole1 arn : arn:aws:iam::226802162969:role/firehoseDeliveryRole1
- user1/permissions/create inline policy to PassRole to firehose service
  iamPassRole-ToService-firehose
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Action": "iam:PassRole",
			"Resource": "arn:aws:iam::226802162969:role/firehoseDeliveryRole1",
			"Condition": {
				"StringEquals": {
					"iam:PassedToService": "firehose.amazonaws.com"
				}
			}
		}
	]
}

In [17]:
res = firehose.create_delivery_stream(
    DeliveryStreamName= 'gps-devlivery-stream',
    DeliveryStreamType= 'DirectPut',
    S3DestinationConfiguration= {
        'RoleARN': 'arn:aws:iam::226802162969:role/firehoseDeliveryRole1',
        'BucketARN': 'arn:aws:s3:::adelramezani-vehicle-data'
    }
)
res

{'DeliveryStreamARN': 'arn:aws:firehose:eu-central-1:226802162969:deliverystream/gps-devlivery-stream',
 'ResponseMetadata': {'RequestId': 'd2183103-46e9-be7a-b2f1-95df368b1812',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd2183103-46e9-be7a-b2f1-95df368b1812',
   'x-amz-id-2': 'RDw9zVDf35zsAwduSfQ98omDaTbJ2B2xP7FhmutUnW1wsDbMT9Y94SV0BTHe7mPYBrcR+UbCp1gnsYuX8TebprBQzcRsHMbN',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '102',
   'date': 'Thu, 10 Jul 2025 11:23:08 GMT'},
  'RetryAttempts': 0}}

In [18]:
print(res['DeliveryStreamARN'])

arn:aws:firehose:eu-central-1:226802162969:deliverystream/gps-devlivery-stream


In [26]:
# Single record
record={    'record_id': '939ed1d1-1740-420c-8906-445278573c7f', # unique record id
     'timestamp': '4:25:06.000', # time of measurement
     'vin': '4FTEX4944AK844294', # vehicle id
     'lon': 106.9447146, # vehicle location longitude
     'lat': -6.3385652, # vehicle location latitude
     'speed': 25 # vehicle speed
}
payload = ' '.join(str(value) for value in record.values())
payload

'939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.3385652 25'

In [29]:
# sending a record
res = firehose.put_record(
    DeliveryStreamName= 'gps-devlivery-stream',
    Record= {
        'Data': payload + '\n' 
    }
)
res

{'RecordId': '9JYi8lPLum3r2Lkrj0vykpKKDaPVnc59OWLXbz9amy5FG4ABhzE2lhmpk8ByvsmJvdzXp6TEZDQ9f7O+hSc5gDPn6Vet7nqeuz9R/HtWnxABmsO0toR5tZR6qH9NYLsgfdtrD2P7qtA7cyDmwuq9vjwyUVvQpohvTMvOT5DoMhjOlfdzLiSPh8t6tDfO6wD123lKVNlI9o+cqd/W6Y4RpzSYXgezeiQp',
 'Encrypted': False,
 'ResponseMetadata': {'RequestId': 'cb3f7f7d-5034-6344-abd6-d239538561f0',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'cb3f7f7d-5034-6344-abd6-d239538561f0',
   'x-amz-id-2': 'ra2l0xmdGvFysrM92WceVoHFXpdqCZ0/MzV+89feMdTQetd+J8DaQ6Q2UPB9UVuetWhjObIm3xhmLv5b4Om6nOhg0nPQFNNO',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '257',
   'date': 'Thu, 10 Jul 2025 11:59:00 GMT'},
  'RetryAttempts': 0}}

In [33]:
# Get the object from s3
#S3_DATA_KEY = 's3://adelramezani-vehicle-data/2025/07/10/11/gps-devlivery-stream-1-2025-07-10-11-58-09-2e9d9388-b08b-47a7-bbe3-86b431f530c7'
#S3_DATA_KEY = 'gps-devlivery-stream-1-2025-07-10-11-58-09-2e9d9388-b08b-47a7-bbe3-86b431f530c7'
S3_DATA_KEY = '2025/07/10/11/gps-devlivery-stream-1-2025-07-10-11-58-09-2e9d9388-b08b-47a7-bbe3-86b431f530c7'
obj_data = s3.get_object(Bucket= 'adelramezani-vehicle-data', Key= S3_DATA_KEY)
obj_data

{'ResponseMetadata': {'RequestId': 'B0NVPAXKVVSQD05W',
  'HostId': 'xBERpJG0LWMEuzy44I6LAge+Q6iqfM/qXJXrR/+GeggGT/I73+Vw5Fs8AZLylj+coPv9zf46XjM=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'xBERpJG0LWMEuzy44I6LAge+Q6iqfM/qXJXrR/+GeggGT/I73+Vw5Fs8AZLylj+coPv9zf46XjM=',
   'x-amz-request-id': 'B0NVPAXKVVSQD05W',
   'date': 'Thu, 10 Jul 2025 12:19:05 GMT',
   'last-modified': 'Thu, 10 Jul 2025 12:03:10 GMT',
   'etag': '"7430a53e72d0f591d93e1ad2ba2bb777"',
   'x-amz-server-side-encryption': 'AES256',
   'accept-ranges': 'bytes',
   'content-type': 'application/octet-stream',
   'content-length': '277',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'AcceptRanges': 'bytes',
 'LastModified': datetime.datetime(2025, 7, 10, 12, 3, 10, tzinfo=tzutc()),
 'ContentLength': 277,
 'ETag': '"7430a53e72d0f591d93e1ad2ba2bb777"',
 'ContentType': 'application/octet-stream',
 'ServerSideEncryption': 'AES256',
 'Metadata': {},
 'Body': <botocore.response.StreamingBody at 0x1f8b3ca3730>}

In [36]:
# read data into Dataframe
vehicle_data = pd.read_csv(
    obj_data['Body'], 
    delimiter = ' ', 
    names = ['record_id', 'timestamp', 'vin', 'lon', 'lat', 'speed' ]
)
vehicle_data

Unnamed: 0,Unnamed: 1,Unnamed: 2,Unnamed: 3,Unnamed: 4,Unnamed: 5,Unnamed: 6,Unnamed: 7,Unnamed: 8,Unnamed: 9,record_id,timestamp,vin,lon,lat,speed
939ed1d1-1740-420c-8906-445278573c7f,4:25:06.000,4FTEX4944AK844294,106.944715,-6.338565,25939ed1d1-1740-420c-8906-445278573c7f,4:25:06.000,4FTEX4944AK844294,106.944715,-6.338565,25939ed1d1-1740-420c-8906-445278573c7f,4:25:06.000,4FTEX4944AK844294,106.944715,-6.338565,25


# Writing to a Firehose stream

In [40]:
# OBD2_sensors.py: Write to Firehose stream. EDIT HERE.
import _setup, create_firehose
firehose, s3, records = _setup.ex_vars
for idx, row in records.iterrows(): 

    # Create a payload string that ends with a newline
    payload = ' '.join(str(value) for value in row) 
    payload = payload + "\n"
    print("Sending payload: {}".format(payload))

    # Send the payload string to Firehose stream
    res = firehose.put_record(
        DeliveryStreamName = 'gps-delivery-stream',
        Record = {'Data': payload})

    # Print the written RecordId
    print("Wrote to RecordId: {}".format(res['RecordId']))


Sending payload: 939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.3385652 25

Wrote to RecordId: O+XRgIhmvL4It+YjrQOM1x1HfvSsq7cAUCw3JijqCP/k0KAOk+HpgcgrWOAQsN/DxN1bouMsx4Jg+erfFc//94gMcXZboXUmvIc36Omqerny8YxuVUzP7qcRUmYTuYRmRm2rDNfvmooz7ow0Kv4bEeH4VbJ8BIwzyZPk7NCUdreplFToVKrU4AGZV9AjXbhyJF8Jk98I4xdaDxWiJMd5/KeHNnBut9tR
Sending payload: f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.580681 34.79925 37

Wrote to RecordId: b37vq+oE3cWN60q5UL//fEaEvL4t+TDbYFjVXfjerOWwwIEbjJQu+tlSynytHqAyr21Vh8yYFR6gigDukpM5I32QW3uJL7PG+4x1RLrw7s4PuxK/ibjc0E4j9uRAFYwNjoZITB4wZEI3CfO8zMtJUwgmXEG7UiJWoRw9HusV0o5X38Hsr3uG2ikzDQPqIMmZDUNql1EuOnIRAtr19G54tjjFqajRAxGT
Sending payload: ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.392392 36.097577 90

Wrote to RecordId: q/f2gsEBC5PQ6T3nuM5an3MT3JIoy7BbFY9wpKAJo5KjaGr/K0f7blmDLJYCgyKFvX+0rtwt+oe0pQSvJ7CtbEcFFzADG64YlRBNhQXWWcKw7l1eu+vE4uMVbd7h3LGMIUS8vkVk9W3CWfq93VSW1nPLZoNhDvm6Tuxl

# Reading Firehose data

In [42]:
# analyze_data.py: Analyze written sensor data. EDIT HERE.
import _setup, _run_deps, pandas as pd
firehose, s3, records = _setup.ex_vars

# List the objects that have been written to the S3 bucket
objects = s3.list_objects(Bucket='adelramezani-vehicle-data')['Contents']

# Create list for collecting dataframes from read files.
dfs = []

# For every object, load it from S3
for obj in objects:
    print(obj)
    data_file = s3.get_object(Bucket='adelramezani-vehicle-data', Key=obj['Key'])

    # Load it into a dataframe, specifying a delimiter and column names
    dfs.append(pd.read_csv(data_file['Body'], 
                           delimiter = " ", 
                           names=["record_id", "timestamp", "vin", "lon", "lat", "speed"]))

# Concatenate the resulting dataframes.
data = pd.concat(dfs)
data

{'Key': '2025/07/10/11/gps-devlivery-stream-1-2025-07-10-11-58-09-2e9d9388-b08b-47a7-bbe3-86b431f530c7', 'LastModified': datetime.datetime(2025, 7, 10, 12, 3, 10, tzinfo=tzutc()), 'ETag': '"7430a53e72d0f591d93e1ad2ba2bb777"', 'ChecksumAlgorithm': ['CRC32'], 'Size': 277, 'StorageClass': 'STANDARD', 'Owner': {'ID': '48ce1ddf9c77fbb8e1705cca3f76f7c7932c02939a830cba114475a66c152a08'}}
{'Key': '2025/07/10/12/gps-delivery-stream-1-2025-07-10-12-56-45-e7720ff5-5dbc-43e8-a6fb-1694106e9081', 'LastModified': datetime.datetime(2025, 7, 10, 13, 1, 46, tzinfo=tzutc()), 'ETag': '"801fc55f7ef3d6ec70358904aeddc0aa"', 'ChecksumAlgorithm': ['CRC32'], 'Size': 9196, 'StorageClass': 'STANDARD', 'Owner': {'ID': '48ce1ddf9c77fbb8e1705cca3f76f7c7932c02939a830cba114475a66c152a08'}}


Unnamed: 0,record_id,timestamp,vin,lon,lat,speed
"(939ed1d1-1740-420c-8906-445278573c7f, 4:25:06.000, 4FTEX4944AK844294, 106.9447146, -6.3385652, 25939ed1d1-1740-420c-8906-445278573c7f, 4:25:06.000, 4FTEX4944AK844294, 106.9447146, -6.3385652)",25939ed1d1-1740-420c-8906-445278573c7f,4:25:06.000,4FTEX4944AK844294,106.944715,-6.338565,25
0,939ed1d1-1740-420c-8906-445278573c7f,4:25:06.000,4FTEX4944AK844294,106.944715,-6.338565,25
1,f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a,8:10:47.000,3FTEX1G5XAK844393,108.580681,34.799250,37
2,ff8e7131-408d-463b-8d07-d016419b0656,20:26:44.000,2LAXX1C8XAK844292,114.392392,36.097577,90
3,bc75da5f-1bf6-444c-80ad-49c180e1b8de,23:16:06.000,3FTEX1G5XAK844393,-76.699017,2.481207,40
...,...,...,...,...,...,...
95,2e29eb3b-a505-4c19-b6f2-247c49f1e203,23:45:23.000,2LAXX1C8XAK844292,0.742312,45.190519,45
96,a91c95c2-8070-46a5-b4d4-3aaa002dcec3,8:36:04.000,5FTEX1MAXAK844295,113.121300,-8.129700,59
97,230f2ad2-c1ef-4d78-8f89-c400e8e4b252,1:29:11.000,3FTEX1G5XAK844393,-8.286948,41.282851,12
98,a883c2d1-d3be-4df2-b0b7-bf37fce39049,20:19:58.000,4FTEX4944AK844294,-8.646509,41.260025,14


In [43]:
print(data.groupby(['vin'])['speed'].max())

vin
1FTEX1C8XAK855191     91
2LAXX1C8XAK844292     98
3FTEX1G5XAK844393     96
4FTEX4944AK844294    100
5FTEX1MAXAK844295     99
Name: speed, dtype: int64
