In [1]:
from elasticsearch import Elasticsearch
from dotenv import load_dotenv
import os

load_dotenv()

client = Elasticsearch(
  os.getenv("ELASTIC_URL"),
  api_key=(os.getenv("ELASTIC_API_KEY"))
)


In [4]:
def delete_index(elastic_client: Elasticsearch, index_name: str):
    try:
        # Use Elasticsearch.indices.delete method
        response = elastic_client.indices.delete(index=index_name)
        
        if response and response.get("acknowledged", False):
            print(f"Index '{index_name}' deleted successfully!")
            return True
        else:
            print(f"Failed to delete index '{index_name}'. Response: {response}")
            return False
    except Exception as e:
        print(f"Error deleting index '{index_name}': {str(e)}")
        return False
    
mappings_folder = os.getenv("BASE_PROJECT_PATH") + "src/mappings/"
# get files that end with .json
mapping_files = [f for f in os.listdir(mappings_folder) if f.endswith(".json")]
# loop through the files
for mapping_file in mapping_files:
    # get the index name
    index_name = mapping_file.split(".")[0]
    # create the index
    delete_index(client, index_name)

Index 'jayzz_movie_index' deleted successfully!
Index 'jayzz_review_index' deleted successfully!
Index 'jayzz_user_index' deleted successfully!


In [33]:
def find_movie_by_id( index_name: str, movie_id: str):
    try:
        elastic_client = Elasticsearch(
            os.getenv("ELASTIC_URL"),
            api_key=(os.getenv("ELASTIC_API_KEY"))
        )
        # Use Elasticsearch.search method to find the movie by movieId
        response = elastic_client.search(
            index=index_name,
            body={
                "query": {
                    "match": {
                        "movieId": movie_id
                    }
                }
            }
        )

        # Check if there are hits in the response
        hits = response["hits"]["hits"]
        
        # close the connection
        elastic_client.close()
        
        if hits:
            # Return the first hit (assuming movieId is unique)
            return hits[0]["_source"]
        else:
            return None
    except Exception as e:
        print(f"Error finding movie by movieId '{movie_id}': {str(e)}")
        return None
    


In [34]:
find_movie_avg_rating_by_id("jayzz_movie_index", "8")


0.0

In [3]:
from elasticsearch import Elasticsearch
from typing import Dict
import json
from dotenv import load_dotenv
import os

load_dotenv()

def create_index(client: Elasticsearch, index_name: str, mapping: Dict):
    try:
        # Use Elasticsearch.indices.create method
        client.indices.create(
            index=index_name,
            body=mapping,
            ignore=400  # Ignore 400 already exists code
        )
        print(f"Created index {index_name} successfully!")
        return True
    except Exception as e:
        print(f"Error creating index {index_name}: {str(e)}")
        return False
    
mappings_folder = os.getenv("BASE_PROJECT_PATH") + "src/mappings/"
# get files that end with .json
mapping_files = [f for f in os.listdir(mappings_folder) if f.endswith(".json")]
# loop through the files
for mapping_file in mapping_files:
    # get the index name
    index_name = mapping_file.split(".")[0]
    # open the file and load the json
    with open(mappings_folder + mapping_file) as f:
        mapping = json.load(f)
    # create the index
    create_index(client, index_name, mapping)


  client.indices.create(


Created index jayzz_movie_index successfully!
Created index jayzz_review_index successfully!
Created index jayzz_user_index successfully!


In [None]:

create_index(client, 'nested_movies_reviews', nested_review_index_mapping)


Created index nested_movies_reviews successfully!


True

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



In [5]:
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

In [6]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Converting function to UDF 
convertUDF = udf(lambda z: convertCase(z),StringType())

In [7]:
# StringType() is by default hence not required 
convertUDF = udf(lambda z: convertCase(z)) 

In [8]:
df.select(col("Seqno"), \
    convertUDF(col("Name")).alias("Name") ) \
   .show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



In [12]:
from confluent_kafka.admin import AdminClient, NewTopic

def recreate_kafka_topic(topic_name, bootstrap_servers):
    # Set up Kafka AdminClient
    admin_conf = {
        'bootstrap.servers': bootstrap_servers,
        'client.id': 'kafka_admin_client'
    }
    
    admin_client = AdminClient(admin_conf)

    # Delete the existing topic if it exists
    topic_deletion_futures = admin_client.delete_topics([topic_name], operation_timeout=5)
    for topic, future in topic_deletion_futures.items():
        try:
            future.result()
            print(f"Topic '{topic}' deleted successfully.")
        except Exception as e:
            print(f"Failed to delete topic '{topic}': {e}")

    # Recreate the topic with desired configuration
    topic_config = {
        'cleanup.policy': 'delete',  # Customize other configuration parameters as needed
        'retention.ms': '86400000'  # Example: Set retention time to 24 hours
    }
    
    new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1, config=topic_config)
    topic_creation_futures = admin_client.create_topics([new_topic], operation_timeout=5)
    for topic, future in topic_creation_futures.items():
        try:
            future.result()
            print(f"Topic '{topic}' created successfully.")
        except Exception as e:
            print(f"Failed to create topic '{topic}': {e}")


# Example usage
topic_name_to_recreate = 'reviews'
bootstrap_servers_to_use = 'localhost:9092'  # Replace with your Kafka broker's address
recreate_kafka_topic(topic_name_to_recreate, bootstrap_servers_to_use)


KeyboardInterrupt: 