In [1]:
import requests

from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
!pip install confluent-kafka

Collecting confluent-kafka
  Downloading confluent_kafka-2.4.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.0/4.0 MB[0m [31m30.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: confluent-kafka
Successfully installed confluent-kafka-2.4.0


In [3]:
!pip install confluent-kafka configparser

Collecting configparser
  Downloading configparser-7.0.0-py3-none-any.whl (16 kB)
Installing collected packages: configparser
Successfully installed configparser-7.0.0


In [None]:
### For CSV usage

In [None]:
# Value by value publish

import csv
from confluent_kafka import Producer
import json
import time

# Function to read Confluent Cloud configuration from file
def read_ccloud_config(config_file):
    omitted_fields = set(['schema.registry.url', 'basic.auth.credentials.source', 'basic.auth.user.info'])
    conf = {}
    with open(config_file) as fh:
        for line in fh:
            line = line.strip()
            if len(line) != 0 and line[0] != "#":
                parameter, value = line.strip().split('=', 1)
                if parameter not in omitted_fields:
                    conf[parameter] = value.strip()
    return conf

# Function to publish CSV data to Kafka topic
def publish_csv_to_broker(csv_file):
    # Establish connection to Kafka broker
    producer = Producer(read_ccloud_config("gdrive/My Drive/Colab Notebooks/Big Data/client.properties"))

    # Open the CSV file
    with open(csv_file, newline='') as csvfile:
        reader = csv.DictReader(csvfile)

        # Extract column names from the CSV file
        column_names = reader.fieldnames

        # Iterate over each row in the CSV file
        for row in reader:
            try:
                # Iterate over each column in the row
                for column in column_names:
                    # Define the key and value for Kafka message
                    key = column  # Use column name as the key
                    value = {column: row[column]}  # Use column value as the message content

                    # Serialize the value data to JSON
                    serialized_value = json.dumps(value)

                    # Publish message to Kafka topic
                    producer.produce("topic_0", key=key.encode(), value=serialized_value.encode())

                    print("Published CSV data to Kafka:", key, value)
                    time.sleep(1)  # Optional: Add delay between each message
            except Exception as e:
                print("Error publishing message to Kafka:", e)

    # Flush and close Kafka producer
    producer.flush()
    producer.close()

# Main function to execute the publishing process
def main():
    # CSV file path
    csv_file = "gdrive/My Drive/Colab Notebooks/Big Data/aqi.csv"

    # Publish CSV data to Kafka topic
    publish_csv_to_broker(csv_file)

if __name__ == "__main__":
    main()


Published CSV data to Kafka: Unique ID {'Unique ID': '179772'}
Published CSV data to Kafka: Indicator ID {'Indicator ID': '640'}
Published CSV data to Kafka: Name {'Name': 'Boiler Emissions- Total SO2 Emissions'}
Published CSV data to Kafka: Measure {'Measure': 'Number per km2'}
Published CSV data to Kafka: Measure Info {'Measure Info': 'number'}
Published CSV data to Kafka: Geo Type Name {'Geo Type Name': 'UHF42'}
Published CSV data to Kafka: Geo Join ID {'Geo Join ID': '409'}
Published CSV data to Kafka: Geo Place Name {'Geo Place Name': 'Southeast Queens'}
Published CSV data to Kafka: Time Period {'Time Period': '2015'}
Published CSV data to Kafka: Start_Date {'Start_Date': '01/01/2015'}


KeyboardInterrupt: 

In [4]:
# Row by row publish

import csv
from confluent_kafka import Producer
import json
import time

# Function to read Confluent Cloud configuration from file
def read_ccloud_config(config_file):
    omitted_fields = set(['schema.registry.url', 'basic.auth.credentials.source', 'basic.auth.user.info'])
    conf = {}
    with open(config_file) as fh:
        for line in fh:
            line = line.strip()
            if len(line) != 0 and line[0] != "#":
                parameter, value = line.strip().split('=', 1)
                if parameter not in omitted_fields:
                    conf[parameter] = value.strip()
    return conf

# Function to publish CSV data to Kafka topic
def publish_csv_to_broker(csv_file):
    # Establish connection to Kafka broker
    producer = Producer(read_ccloud_config("gdrive/My Drive/Colab Notebooks/Big Data/client.properties"))

    # Open the CSV file
    with open(csv_file, newline='') as csvfile:
        reader = csv.DictReader(csvfile)

        # Extract column names from the CSV file
        column_names = reader.fieldnames

        record_counter = 1  # Initialize counter for records

        # Iterate over each row in the CSV file
        for row in reader:
            try:
                # Define the key and value for Kafka message
                key = str(record_counter)  # Use record counter as the key
                record_counter += 1  # Increment the counter for the next record

                # Construct the value containing all columns
                value = {column: row[column] for column in column_names}

                # Serialize the value data to JSON
                serialized_value = json.dumps(value)

                # Publish message to Kafka topic
                producer.produce("topic_0", key=key.encode(), value=serialized_value.encode())

                print("Published CSV data to Kafka:", key, value)
                time.sleep(1)  # Optional: Add delay between each message
            except Exception as e:
                print("Error publishing message to Kafka:", e)

    # Flush and close Kafka producer
    producer.flush()
    producer.close()

# Main function to execute the publishing process
def main():
    # CSV file path
    csv_file = "gdrive/My Drive/Colab Notebooks/Big Data/aqi.csv"

    # Publish CSV data to Kafka topic
    publish_csv_to_broker(csv_file)

if __name__ == "__main__":
    main()


Published CSV data to Kafka: 1 {'Unique ID': '179772', 'Indicator ID': '640', 'Name': 'Boiler Emissions- Total SO2 Emissions', 'Measure': 'Number per km2', 'Measure Info': 'number', 'Geo Type Name': 'UHF42', 'Geo Join ID': '409', 'Geo Place Name': 'Southeast Queens', 'Time Period': '2015', 'Start_Date': '01/01/2015', 'Data Value': '0.3', 'Message': ''}
Published CSV data to Kafka: 2 {'Unique ID': '179785', 'Indicator ID': '640', 'Name': 'Boiler Emissions- Total SO2 Emissions', 'Measure': 'Number per km2', 'Measure Info': 'number', 'Geo Type Name': 'UHF42', 'Geo Join ID': '209', 'Geo Place Name': 'Bensonhurst - Bay Ridge', 'Time Period': '2015', 'Start_Date': '01/01/2015', 'Data Value': '1.2', 'Message': ''}
Published CSV data to Kafka: 3 {'Unique ID': '178540', 'Indicator ID': '365', 'Name': 'Fine particles (PM 2.5)', 'Measure': 'Mean', 'Measure Info': 'mcg/m3', 'Geo Type Name': 'UHF42', 'Geo Join ID': '209', 'Geo Place Name': 'Bensonhurst - Bay Ridge', 'Time Period': 'Annual Average 2

KeyboardInterrupt: 

In [None]:
### For API usage

In [None]:
import requests

url = "https://zillow56.p.rapidapi.com/search"

querystring = {"location":"houston, tx"}

headers = {
	"X-RapidAPI-Key": "7d1fbcd4dfmshdd985ba83f69d8ep147922jsn6cf883240518",
	"X-RapidAPI-Host": "zillow56.p.rapidapi.com"
}

response = requests.get(url, headers=headers, params=querystring)

print(response.json())

{'results': [{'bathrooms': 2.0, 'bedrooms': 3.0, 'city': 'Houston', 'country': 'USA', 'currency': 'USD', 'daysOnZillow': 0, 'homeStatus': 'FOR_SALE', 'homeStatusForHDP': 'FOR_SALE', 'homeType': 'SINGLE_FAMILY', 'imgSrc': 'https://maps.googleapis.com/maps/api/staticmap?mobile=false&sensor=true&maptype=satellite&size=575x242&zoom=17&center=29.729679107666016,-95.2992935180664&key=AIzaSyBJsNQO5ZeG-XAbqqWLKwG08fWITSxg33w&signature=KpuJnpCAq5rL4CeQ8PwnPQfyMFk=', 'isFeatured': False, 'isNonOwnerOccupied': True, 'isPreforeclosureAuction': False, 'isPremierBuilder': False, 'isShowcaseListing': False, 'isUnmappable': False, 'isZillowOwned': False, 'latitude': 29.72968, 'listing_sub_type': {'is_FSBA': True}, 'livingArea': 1106.0, 'longitude': -95.29929, 'lotAreaUnit': 'sqft', 'lotAreaValue': 5000.688, 'price': 275000.0, 'priceForHDP': 275000.0, 'rentZestimate': 1700, 'shouldHighlight': False, 'state': 'TX', 'streetAddress': '416 Forest Hill Blvd', 'taxAssessedValue': 264445.0, 'timeOnZillow': 28

In [None]:
from confluent_kafka import Producer, KafkaException
import time
import json

#config_file = "gdrive/My Drive/Colab Notebooks/Big Data/client.properties"

# Function to read Confluent Cloud configuration from file
def read_ccloud_config():
    omitted_fields = set(['schema.registry.url', 'basic.auth.credentials.source', 'basic.auth.user.info'])
    conf = {}
    with open("client.properties") as fh:
        for line in fh:
            line = line.strip()
            if len(line) != 0 and line[0] != "#":
                parameter, value = line.strip().split('=', 1)
                if parameter not in omitted_fields:
                    conf[parameter] = value.strip()
    return conf

# Function to publish JSON data to Kafka topic
def publish_json_to_broker(response):
    # Establish connection to Kafka broker
    producer = Producer(read_ccloud_config("client.properties"))

    # Parse JSON response
    results = response.get('results', [])

    # Iterate over each record in the response and publish to Kafka topic
    for result in results:
        try:
            payload = json.dumps(result)
            # Publish message to Kafka topic
            producer.produce("topic_0", value=payload.encode())
            print("Published JSON data to Kafka:", payload)
            time.sleep(1)  # Optional: Add delay between each message
        except KafkaException as ke:
            print("Error publishing message to Kafka:", ke)

    # Flush and close Kafka producer
    producer.flush()
    producer.close()

# Main function to execute the publishing process
def main():

    # Continuously publish JSON data to Kafka topic
    while True:
        publish_json_to_broker(response)  # Publish JSON data
        time.sleep(10)  # Optional: Add delay between each iteration

if __name__ == "__main__":
    main()
