<a href="https://colab.research.google.com/github/TolulopeOyejide/PySpark/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

[PySpark](https://sparkbyexamples.com/pyspark/pyspark-with-sql-server-read-and-write-table/)

# **PySpark Connector For SQL Server**


Must-have details:
1. Driver to use (I will provide this)
2. SQL server address & port
3. Database name
4. Table name
5. User name and Password.

Steps to connect pyspark to sql server, and read and write:
- Step 1- Identify the PySpark SQL Connector version to use
- Step 2- Add the dependency
- Step 3- Create SparkSession & Dataframe
- Step 4- Save PySpark DataFrame to SQL Server Table
- Step 5- Read SQL Table to PySpark Dataframe


The connector works for both:
1. SQL server on-prem.
2. Azure SQL.

The following are the above connectors:
1. Spark 2.4.x  (version 1.0.2)
2. Spark 3.0.x  (Version 1.1.0)
3. Spark 3.1.x  (Version 1.2.)

You can find the all available PySpark Microsoft SQL (Mssql) connectors at https://search.maven.org/search?q=spark-mssql-connector





# **Creating SparkSession & DataFrame**
Creating a SparkSession is a basic step to work with PySpark

In [5]:
# Imports
from pyspark.sql import SparkSession

# Follow pyspark installation guide on Telegram chatGPT.

ModuleNotFoundError: ignored

In [None]:
# Creating SparkSession

spark = SparkSession.builder
            .appName('SparkByExamples.com')
            .config("spark.jars","mysql-connector-java-6.0.6.jar")
            .getOrCreate()

In [None]:
sampleDF = spark.sparkContext.parallelize(data).toDF(columns)

In [None]:
# Write to SQL Table
sampleDF.write \
  .format("com.microsoft.sqlserver.jdbc.spark") \
  .mode("overwrite") \
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;") \
  .option("dbtable", "employee") \
  .option("user", "replace_user_name") \
  .option("password", "replace_password") \
  .save()


  # Here, used mode("overwrite") means if the table already exists with rows,...
  # ...overwrite the table with the rows from the DataFrame.
  # The overwrite mode first drops the table if it already exists in the database.

In [None]:
# Read from SQL Table
df = spark.read \
  .format("com.microsoft.sqlserver.jdbc.spark") \
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;") \
  .option("dbtable", "employee") \
  .option("user", "replace_user_name") \
  .option("password", "replace_password") \
  .load()

df.show()



In [None]:
# Select Specific Columns to Read
# Read from SQL Table
df = spark.read \
  .format("com.microsoft.sqlserver.jdbc.spark") \
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;") \
  .option("dbtable", "select id,age from employee where gender='M'") \
  .option("user", "replace_user_name") \
  .option("password", "replace_password") \
  .load()

df.show()


In [None]:
# APPEND TABLE
# Write to SQL Table
sampleDF.write \
  .format("com.microsoft.sqlserver.jdbc.spark") \
  .mode("append") \
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;") \
  .option("dbtable", "employee") \
  .option("user", "replace_user_name") \
  .option("password", "replace_password") \
  .save()

# Use spark.write.mode("append") to append the rows to the existing SQL Server table.

In [None]:
# COMPLETE EXAMPLE
# Imports
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder
           .appName('SparkByExamples.com')
           .config("spark.jars", "mysql-connector-java-6.0.6.jar")
           .getOrCreate()

# Create DataFrame
columns = ["id", "name","age","gender"]
data = [(1, "James",30,"M"), (2, "Ann",40,"F"),
    (3, "Jeff",41,"M"),(4, "Jennifer",20,"F")]

sampleDF = spark.sparkContext.parallelize(data).toDF(columns)

# Write to SQL Table
sampleDF.write \
  .format("com.microsoft.sqlserver.jdbc.spark") \
  .mode("overwrite") \
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;") \
  .option("dbtable", "employee") \
  .option("user", "replace_user_name") \
  .option("password", "replace_password") \
  .save()

# Read from SQL Table
df = spark.read \
  .format("com.microsoft.sqlserver.jdbc.spark") \
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;") \
  .option("dbtable", "employee") \
  .option("user", "replace_user_name") \
  .option("password", "replace_password") \
  .load()

df.show()
