In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Step 1: Extract
def extract(file_path):
    spark = SparkSession.builder \
        .appName("ETL Example") \
        .config("spark.mongodb.input.uri", "mongodb://localhost:27017/bank_db.branch_transactions") \
        .config("spark.mongodb.output.uri", "mongodb://localhost:27017/bank_db.branch_transactions") \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
        .getOrCreate()
    
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    return df

# Step 2: Transform
def transform(df):
    # Example transformation: increase salary by 10%
    transformed_df = df.withColumn("salary", col("salary") * 1.1)
    return transformed_df

# Step 3: Load
def load(df):
    # Write to MongoDB
    # df.write.format("mongo").mode("overwrite").save()
    df.write \
    .format("mongo") \
    .mode("overwrite") \
    .option("uri", "mongodb://localhost:27017/bank_db.branch_transactions") \
    .save()

def main():
    input_path = r'C:\Users\kidan\OneDrive\Desktop\data scince jupyter\Salary_Data.csv'

    # ETL Process
    data = extract(input_path)
    transformed_data = transform(data)
    load(transformed_data)

if __name__ == "__main__":
    main()

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Step 1: Extract
def extract(file_path):
    spark = SparkSession.builder \
        .appName("ETL Example") \
        .config("spark.mongodb.output.uri", "mongodb://localhost:27017/bank_db.branch_transactions") \
        .config("spark.mongodb.input.uri", "mongodb://localhost:27017/bank_db.branch_transactions") \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
        .getOrCreate()
    
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    return df

# Step 2: Transform
def transform(df):
    # Example transformation: increase salary by 10%
    transformed_df = df.withColumn("salary", col("salary") * 1.1)
    return transformed_df

# Step 3: Load
def load(df):
    # Write to MongoDB
    df.write \
    .format("mongo") \
    .mode("overwrite") \
    .option("uri", "mongodb://localhost:27017/bank_db.branch_transactions") \
    .save()

# Step 4: Read from MongoDB
def read_from_mongodb(spark):
    # Read data from MongoDB
    mongodb_df = spark.read.format("mongo").load()
    return mongodb_df

def main():
    input_path = r'C:\Users\kidan\OneDrive\Desktop\data scince jupyter\Salary_Data.csv'

    # ETL Process
    data = extract(input_path)
    transformed_data = transform(data)
    load(transformed_data)

    # Read from MongoDB
    spark = SparkSession.builder \
        .appName("Read from MongoDB") \
        .config("spark.mongodb.input.uri", "mongodb://localhost:27017/bank_db.branch_transactions") \
        .getOrCreate()
        
    loaded_data = read_from_mongodb(spark)
    
    # Show the data read from MongoDB
    loaded_data.show()

if __name__ == "__main__":
    main()

+---------------+--------------------+------------------+
|YearsExperience|                 _id|            salary|
+---------------+--------------------+------------------+
|            1.1|{671a000669bcb764...|           43277.3|
|            1.3|{671a000669bcb764...| 50825.50000000001|
|            1.5|{671a000669bcb764...|41504.100000000006|
|            2.0|{671a000669bcb764...| 47877.50000000001|
|            2.2|{671a000669bcb764...|43880.100000000006|
|            2.9|{671a000669bcb764...|62306.200000000004|
|            3.0|{671a000669bcb764...|           66165.0|
|            3.2|{671a000669bcb764...| 59889.50000000001|
|            3.2|{671a000669bcb764...|           70889.5|
|            3.7|{671a000669bcb764...|           62907.9|
|            3.9|{671a000669bcb764...|           69539.8|
|            4.0|{671a000669bcb764...|           61373.4|
|            4.0|{671a000669bcb764...|62652.700000000004|
|            4.1|{671a000669bcb764...|62789.100000000006|
|            4

# Read(extract) data from mogodb Transform an load to MongoDB 

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# Step 1: Extract
def extract(spark):
    # Read data from MongoDB
    df = spark.read.format("mongo").option("uri", "mongodb://localhost:27017/bank_data.transactions").load()
    return df
# Step 2: Transform
def transform(df):
    # Example transformation: categorize accounts based on amount
    transformed_df = df.withColumn(
        "account_category", 
        when(col("amount") < 50000, "Legal")
        .when(col("amount") >= 50000, "Saspeciese")
        .otherwise("Unknown")
    )
    return transformed_df

# Step 3: Load
def load(df):
    # Write the transformed data back to MongoDB
    df.write.format("mongo").mode("overwrite").option("uri", "mongodb://localhost:27017/bank_data.branch_transactions").save()

def main():
    # Create Spark session
    spark = SparkSession.builder \
        .appName("ETL Bank Data Example") \
        .config("spark.mongodb.input.uri", "mongodb://localhost:27017/bank_data.transactions") \
        .config("spark.mongodb.output.uri", "mongodb://localhost:27017/bank_data.branch_transactions") \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
        .getOrCreate()

    # ETL Process
    extracted_data = extract(spark)
    transformed_data = transform(extracted_data)
    load(transformed_data)

    # Optional: Read back the transformed data to verify
    loaded_data = spark.read.format("mongo").option("uri", "mongodb://localhost:27017/bank_data.branch_transaction").load()
    print(loaded_data.show())

if __name__ == "__main__":
    main()

++
||
++
++

None
