In [None]:
# Install the PySpark library, which is a Python API for Apache Spark, a distributed computing system.
%pip install pyspark

# Install the PyMongo library, which is a Python driver for MongoDB, a NoSQL database.
%pip install pymongo

# Install the PyMongo-Spark library, which is a Spark connector for MongoDB, allowing you to read and write data between Spark and MongoDB.
%pip install pymongo-spark

# Install the PyMongo library with the 'srv' option, which is required for connecting to MongoDB Atlas, MongoDB's cloud database service, using the 'mongodb+srv' URI scheme.
# Also, install the PySpark library, which is a Python API for Apache Spark, a distributed computing system.
%pip install pymongo[srv] pyspark

In [None]:
from pyspark.sql import SparkSession
from pymongo import MongoClient

In [None]:
def spark_inst():
    # Creating a SparkSession instance with the following configurations:
    # - master: "local[*]" means running Spark locally with as many worker threads as logical cores on your machine
    # - appName: 'Spark' is the name of the Spark application
    # - getOrCreate(): This method returns an existing SparkSession if there is one, or creates a new one if necessary
    return SparkSession.builder.master("local[*]")\
           .appName('Spark')\
           .getOrCreate()
spark_inst()

**Extracting data from mongo db and calculating execution time**

In [1]:
from pyspark.sql import SparkSession
from pymongo import MongoClient
import bson
import time

def extract_data_from_mongodb():
    # Create a Spark session
    spark = SparkSession.builder \
        .appName("MongoDB Extraction") \
        .getOrCreate()

    # Connect to MongoDB
    client = MongoClient("mongodb://localhost:27017")
    db = client.Stage0ims
    collection = db.Insurance_Policy_Data

    # Load data into PySpark DataFrame
    data = collection.find()
    data = [{k: str(v) if isinstance(v, bson.ObjectId) else v for k, v in doc.items()} for doc in data]
    df = spark.createDataFrame(data)
    

    # Show data
    df.show()

    # Close connection and Spark session
    client.close()
    spark.stop()

# Call the function to extract data from MongoDB
extract_data_from_mongodb()


+--------------------+---------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+-----------------------+--------------------+------------------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------+----------+------------+---------+------------------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+-------------+----------+--------------------+-----------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      AccountingDate|AgentCode|    ApprovedDateTime|          ApproverId|  

**Extracting data from mongo db and calculating execution time**

In [None]:
from pyspark.sql import SparkSession
from pymongo import MongoClient
import bson
import time

def main():
    startTime = time.perf_counter()

    # Code or function calls you want to measure
    extract_data_from_mongodb()

    endTime = time.perf_counter()
    totalTime = endTime - startTime

    print(f"Execution time: {totalTime} seconds")

    print("totaltime to execute the code is:", totalTime)

def extract_data_from_mongodb():
    # Create a Spark session
    spark = SparkSession.builder \
        .appName("MongoDB Extraction") \
        .getOrCreate()
    # Connect to MongoDB
    client = MongoClient("mongodb://localhost:27017")
    db = client.Stage0ims
    collection = db.Insurance_Policy_Data

    # Load data into PySpark DataFrame
    data = collection.find()
    data = [{k: str(v) if isinstance(v, bson.ObjectId) else v for k, v in doc.items()} for doc in data]
    df = spark.createDataFrame(data)

    # Show data
    df.show()

    # Close connection and Spark session
    client.close()
    spark.stop()
# Call the main function to measure execution time
main()

**Data Extraction from mongodb i.e reading the data from mongodb**

In [None]:
from pyspark.sql import SparkSession
import time

def extract_data_from_mongodb():
    # Create a Spark session with increased driver memory
    spark = SparkSession.builder \
        .appName("YourAppName") \
        .config("spark.driver.memory", "2g") \
        .config("spark.mongodb.input.uri", "mongodb://localhost:27017/world.city") \
        .config("spark.mongodb.output.uri", "mongodb://localhost:27017/world.city") \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
        .getOrCreate()

    # Load data into PySpark DataFrame
    df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
    
    df_filtered = df.filter(df['CountryCode'] == 'CC1')

    # Show filtered data
    df_filtered.show()
    # Show data
    df.show()

    # Stop Spark session
    spark.stop()
    
starttime = time.time()
extract_data_from_mongodb()
endtime = time.time()
totalTime = endtime-starttime

print(f"total time to call the function is : {totalTime}")

**transformation 1 renaming the columns**

In [None]:
from pyspark.sql import SparkSession

def extract_data_from_mongodb():
    # Create a Spark session with increased driver memory
    spark = SparkSession.builder \
        .appName("YourAppName") \
        .config("spark.driver.memory", "2g") \
        .config("spark.mongodb.input.uri", "mongodb://localhost:27017/world.city") \
        .config("spark.mongodb.output.uri", "mongodb://localhost:27017/world.city") \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
        .getOrCreate()

    # Load data into PySpark DataFrame
    df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
    renamedf = df.withColumnRenamed("Country_Code", "CountryCode") \
       .withColumnRenamed("District_Name", "DistrictName") \
       .withColumnRenamed("City_ID", "CityID") \
       .withColumnRenamed("City_Name", "CityName") \
       .withColumnRenamed("City_Population", "CityPopulation")

    # Show data
    renamedf.show()

    # Stop Spark session
    spark.stop()

extract_data_from_mongodb()

**Loading the transformed data in another database in mongodb**

In [None]:
from pyspark.sql import SparkSession

def extract_data_from_mongodb():
    # Create a Spark session with increased driver memory
    spark = SparkSession.builder \
        .appName("YourAppName") \
        .config("spark.driver.memory", "2g") \
        .config("spark.mongodb.input.uri", "mongodb://localhost:27017/world.city") \
        .config("spark.mongodb.output.uri", "mongodb://localhost:27017/world.city") \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
        .getOrCreate()

    # Load data into PySpark DataFrame
    df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
    renamedf = df.withColumnRenamed("Country_Code", "CountryCode") \
       .withColumnRenamed("District_Name", "DistrictName") \
       .withColumnRenamed("City_ID", "CityID") \
       .withColumnRenamed("City_Name", "CityName") \
       .withColumnRenamed("City_Population", "CityPopulation")


    # Show data
    renamedf.show()
    # Write data to another MongoDB database
    renamedf.write.format("com.mongodb.spark.sql.DefaultSource") \
        .option("uri", "mongodb://localhost:27017/world2.city2") \
        .mode("overwrite") \
        .save()

    # Stop Spark session
    spark.stop()

extract_data_from_mongodb()


**transformation 2 Selecting specific column from the source db**

In [None]:
from pyspark.sql import SparkSession

def extract_data_from_mongodb():
    # Create a Spark session with increased driver memory
    spark = SparkSession.builder \
        .appName("YourAppName") \
        .config("spark.driver.memory", "2g") \
        .config("spark.mongodb.input.uri", "mongodb://localhost:27017/world.city") \
        .config("spark.mongodb.output.uri", "mongodb://localhost:27017/world.city") \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
        .getOrCreate()

    # Load data into PySpark DataFrame
    df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
    df.select("CountryCode", "District").show()

    # Show data
    # Stop Spark session
    spark.stop()

extract_data_from_mongodb()

**loading the transformed data source**

In [None]:
from pyspark.sql import SparkSession

def extract_data_from_mongodb():
    # Create a Spark session with increased driver memory
    spark = SparkSession.builder \
        .appName("YourAppName") \
        .config("spark.driver.memory", "2g") \
        .config("spark.mongodb.input.uri", "mongodb://localhost:27017/world.city") \
        .config("spark.mongodb.output.uri", "mongodb://localhost:27017/world.city") \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
        .getOrCreate()

    # Load data into PySpark DataFrame
    df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
    selected_df = df.select("CountryCode", "District")

    # Write selected columns to MongoDB
    selected_df.write.format("com.mongodb.spark.sql.DefaultSource") \
        .option("uri", "mongodb://localhost:27017/world2.city3") \
        .mode("overwrite") \
        .save()
    # Show data
    # Stop Spark session
    spark.stop()

extract_data_from_mongodb()

In [None]:
spark= SparkSession.builder.master("local[*]")\
           .appName('Spark')\
           .getOrCreate()
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

# spark= SparkSession.builder.master("local[*]")\
#            .appName('Spark')\
#            .getOrCreate()
data = [('Chirag','','Gupta','2001-06-17','M',3000)]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

In [None]:
from pyspark.sql import SparkSession
Spark = SparkSession.builder.appName('DataFrame example').getOrCreate()

df = spark.read.csv('C:\Workspaces\CodeSpaces\Analytics\Datasets\Account.csv', header=True, inferSchema=True)

df.show()

In [None]:
import pyspark.sql.types as T
from pyspark.sql import SparkSession

# Create a SparkSession (adjust master as needed, e.g., 'yarn', 'mesos')
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Spark") \
    .getOrCreate()

# Sample data with individual values for each column
data = [
    ('Chirag', '', 'Gupta', '2001-06-17', 'M', 3000)
]

# Define the schema, ensuring appropriate data types
columns = ["firstname", "middlename", "lastname", "dob", "gender", "salary"]
schema = T.StructType([
    T.StructField(c, T.StringType(), True) if c != "salary" else T.StructField(c, T.IntegerType(), True)
    for c in columns
])

# Create the DataFrame
df = spark.createDataFrame(data, schema=schema)

# Display the DataFrame
df.show()


In [None]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
import pyodbc
import pandas as pd
import warnings

warnings.filterwarnings('ignore')
# Set up Spark configuration
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Spark") \
    .getOrCreate()

# Database and table details
database = "company_db"
table = "pipeline.employees"
user = "Admin"
password = "sQl@serveR2022"

# Connect to SQL Server using pyodbc
conn = pyodbc.connect(f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER=125.20.10.50,1433;DATABASE={database};UID={user};PWD={password}")
query = f"SELECT * FROM {table}"
pdf = pd.read_sql(query, conn)
sparkDF = spark.createDataFrame(pdf)

# Show the DataFrame
sparkDF.show()
