#Install groclake

In [None]:
!pip install --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple/ groclake==0.1.28

#Step 1.**Create a .env File:** In your project directory, create a .env file and add your configuration variables in KEY=VALUE format.

In [None]:
# Example .env file

# MySQL Configuration
MYSQL_HOST_PROD=127.0.0.1
MYSQL_PORT_PROD=3306
MYSQL_USER_PROD=root
MYSQL_PASSWORD_PROD=password
MYSQL_DB_PROD=example_db

# Elasticsearch Configuration
ES_HOST=127.0.0.1
ES_PORT=9200
ES_API_KEY=es_api_key

# Redis Configuration
REDIS_HOST=127.0.0.1
REDIS_PORT=6379

# Optional: GCP Credentials
# GCP_CREDENTIALS=/path/to/your/gcp_credentials.json


#setp 2: create config.py file

In your Config.py file, use the dotenv package to load the .env file and assign the configuration values.

Example config file

In [None]:
import os
import base64
from dotenv import load_dotenv

# Load environment variables from the .env file
load_dotenv()

class Config:
    MYSQL_CONFIG = {
        "host": os.getenv("MYSQL_HOST_PROD"),
        "port": int(os.getenv("MYSQL_PORT_PROD")),
        "username": os.getenv("MYSQL_USER_PROD"),
        "password": os.getenv("MYSQL_PASSWORD_PROD"),
        "database": os.getenv("MYSQL_DB_PROD"),
    }

    # Elasticsearch connection
    api_key = os.getenv("ES_API_KEY")
    encoded_api_key = base64.b64encode(api_key.encode("utf-8")).decode("utf-8")

    ES_CONFIG = {
        "host": os.getenv("ES_HOST"),
        "port": int(os.getenv("ES_PORT")),
        "headers": {
            "Authorization": f"ApiKey {encoded_api_key}"
        }
    }

    # Redis Configuration
    REDIS_CONFIG = {
        "host": os.getenv("REDIS_HOST"),  # 127.0.0.1
        "port": int(os.getenv("REDIS_PORT")),  # 6379
    }

    # Optional: GCP Credentials for storage and pipeline execution
    # GCP_CREDENTIALS = os.getenv("GCP_CREDENTIALS", "path_to_your_gcp_credentials.json")

# step 3: Import Required Modules  
The **Datalake** class from groclake.datalake provides the basic functionalities for managing data pipelines.

The **Config** module contains the configuration details for MySQL, Elasticsearch (ES), and Redis.

In [None]:
from groclake.datalake import Datalake
from config import Config

#step 4:Define DatalakeConnection Class
The DatalakeConnection class extends Datalake and adds specific data connections to a pipeline.

In [None]:
class DatalakeConnection(Datalake):
    def __init__(self):
        super().__init__()  # Inherit from Datalake class

        # Define the configuration for each connection
        MYSQL_CONFIG = Config.MYSQL_CONFIG
        MYSQL_CONFIG['connection_type'] = 'sql'

        ES_CONFIG = Config.ES_CONFIG
        ES_CONFIG['connection_type'] = 'es'

        REDIS_CONFIG = Config.REDIS_CONFIG
        REDIS_CONFIG['connection_type'] = 'redis'

        # Create and add connections to the pipeline
        self.test_pipeline = self.create_pipeline(name="test_pipeline")
        self.test_pipeline.add_connection(name="sql_connection", config=MYSQL_CONFIG)
        self.test_pipeline.add_connection(name="es_connection", config=ES_CONFIG)
        self.test_pipeline.add_connection(name="redis_connection", config=REDIS_CONFIG)

        # Execute all connections at once
        self.execute_all()

        # Initialize connections
        self.connections = {
            "sql_connection": self.get_connection("sql_connection"),
            "es_connection": self.get_connection("es_connection"),
            "redis_connection": self.get_connection("redis_connection"),
        }

    def get_connection(self, connection_name):
        """
        Returns a connection by name from the pipeline.
        """
        return self.test_pipeline.get_connection_by_name(connection_name)


**Inheriting from Datalake:** The DatalakeConnection class inherits from the Datalake class, so it can use the methods defined in the parent class (like creating pipelines, adding connections, and executing tasks).

**Configuring Connections:** For each connection (MySQL, Elasticsearch, Redis), we fetch the connection details from Config. The connection types are added to distinguish between SQL databases (sql), Elasticsearch (es), and Redis (redis).

**Creating the Pipeline:** We create a pipeline called test_pipeline using the create_pipeline method from the Datalake class. This pipeline will manage all the data connections.

**Adding Connections:** We add each connection to the pipeline by specifying a name and configuration. Each connection is uniquely identified by its name (sql_connection, etc.).

**Executing Connections:** Once all connections are added, we use the execute_all method to execute all connections concurrently using threads.

**Storing Connections:** The connections dictionary stores the connections, which can later be accessed by name (e.g., sql_connection).

#step 5: Intialize DatalakeConnection class

In [None]:
os.environ['GROCLAKE_API_KEY'] = userdata.get('groclake_api_key')
os.environ['GROCLAKE_ACCOUNT_ID'] = userdata.get('groclake_account_id')

In [None]:
datalake_connection = DatalakeConnection()

Here we create an instance of the DatalakeConnection class. When the class is instantiated, it automatically creates the pipeline, adds the connections (MySQL, Elasticsearch, Redis), and executes them concurrently.

# step 6:Accessing a Specific Connection

In [None]:
# Accessing a specific connection (MySQL connection in this case)
mysql_connection = datalake_connection.connections["sql_connection"]
print("MySQL Connection:", mysql_connection)

es_connection = datalake_connection.connections["es_connection"]
print("Elasticsearch Connection:", es_connection)

redis_connection = datalake_connection.connections["redis_connection"]
print("Redis Connection:", redis_connection)

MySQL Connection: <groclake.datalake.connection.SQLConnection object at 0x7b166dc5b410>
Elasticsearch Connection: <groclake.datalake.connection.ESConnection object at 0x7b166dc5ae10>
Redis Connection: <groclake.datalake.connection.RedisConnection object at 0x7b166dc58910>


# USE of MySQL Connection



In [None]:
# INSERT DATA
def insert_user(name, status):
    query = "INSERT INTO user_details (user_id, name, status) VALUES (%s, %s, %s)"
    params = ("1234", name, status)
    mysql_connection.write(query, params)
    return True


#FETCH DATA
def get_user_info(user_id):
    query = "SELECT * FROM user_details WHERE user_id = %s"
    params = (user_id,)
    response = mysql_connection.read(query, params)
    return response

# Use of Elasticsearch connection

In [None]:
# WRITE

# Define the write query
write_query = {
    "index": "users",  # Index name
    "body": {
        "user_id": "123",
        "name": "Alice",
        "role": "Engineer"
    }
}


write_response = es_connection.write(write_query)
print("Write Response:", write_response)


#READ
read_query = {
    "index": "users",
    "body": {
        "query": {
            "match_all": {}
        }
    }
}


document_count = es_connection.read(read_query)
print("Total Documents:", document_count)

#USE of Redis Connection


In [None]:
# WRITE
key = "user:1000:name"
value = "John Doe"
cache_ttl = 3600  # TTL of 1 hour

redis_connection.set(key, value, cache_ttl)
print(f"Set value for {key}: {value}")

#READ
key = "user:1000:name"
value = redis_connection.get(key)
print(f"Got value for {key}: {value.decode('utf-8')}")