In [47]:
pip install mysql-connector-python pandas




In [48]:
import mysql.connector
import pandas as pd

#Connect to Server

conn = mysql.connector.Connect(
    host = "127.0.0.1",
    port = 3306,
    user = "root",
    password = "root",
    database = "employees"
)

# Query. Multiline query with docstring """ or '''.
# As we already defined database in connection so we don't have to write employees.employees in FROM.

query = '''
SELECT 
    emp_no as Emp_Id,
    concat(first_name," ",last_name),
    gender,
    hire_date
FROM 
    employees;
'''

# Pass query and conn in df.read_sql

df = pd.read_sql(query, conn)

#Instead of alias in SQL query, renaming columns in dataframe.
df.rename(
    columns={
        'concat(first_name," ",last_name)': "Name",
        "gender": "Gender",
        "hire_date":"Hire_Date"
        },
        #Using Inplace = True to modify current dataframe instead of creating new object.
        inplace=True
        )

#Fetching last five elements of table.
df.tail(5)

  df = pd.read_sql(query, conn)


Unnamed: 0,Emp_Id,Name,Gender,Hire_Date
300019,499995,Dekang Lichtner,F,1993-01-12
300020,499996,Zito Baaz,M,1990-09-27
300021,499997,Berhard Lenart,M,1986-04-21
300022,499998,Patricia Breugel,M,1993-10-13
300023,499999,Sachin Tsukuda,M,1997-11-30


In [81]:
# Improvement 1 - Using Classes to improve code Cleanliness, Reusability, Scalability, Maintainability, Extensibility.

import mysql.connector
import pandas as pd

class MySQL:
    # defined default values for host, port, username and password as it remains fix. Will save typing in function calling.
    def __init__(self, host = "127.0.0.1", port = 3306, user = "root", password = "root", database = None):
        self.host = host
        self.port =  port
        self.user =  user
        self.password = password
        self.database = database
    
    def connect(self):
        conn = mysql.connector.Connect(
            host = self.host,
            port = self.port,
            user = self.user,
            password = self.password,
            database = self.database
        )
        return conn

query_1 = """
SELECT 
    emp_no as Emp_Id,
    concat(first_name," ",last_name),
    gender,
    hire_date
FROM 
    employees;
"""

employees = MySQL(database = "employees")
conn = employees.connect()

df = pd.read_sql(query_1, conn)
df.rename(
    columns={
        'concat(first_name," ",last_name)': "Name",
        "gender": "Gender",
        "hire_date":"Hire_Date"
        },
        #Using Inplace = True to modify current dataframe instead of creating new object.
        inplace=True
        )
print(df.head(5))
print(df.tail(5))

  df = pd.read_sql(query_1, conn)


   Emp_Id               Name Gender   Hire_Date
0   10001     Georgi Facello      M  1986-06-26
1   10002     Bezalel Simmel      F  1985-11-21
2   10003      Parto Bamford      M  1986-08-28
3   10004  Chirstian Koblick      M  1986-12-01
4   10005   Kyoichi Maliniak      M  1989-09-12
        Emp_Id              Name Gender   Hire_Date
300019  499995   Dekang Lichtner      F  1993-01-12
300020  499996         Zito Baaz      M  1990-09-27
300021  499997    Berhard Lenart      M  1986-04-21
300022  499998  Patricia Breugel      M  1993-10-13
300023  499999    Sachin Tsukuda      M  1997-11-30


In [82]:
"""
Improvement 2
1. Defined method read_query inside class. read_query method will call the connection. User don't have to make connection. It will show Data Abstraction feature.
2. Created dictionary of DB credentials. This way we can define and use more than one DB's easily.
"""

import mysql.connector
import pandas as pd

#  Created dictionary of fixed parameters. And we will unpack it in function call.
db_config = {
"local":{
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "password": "root"
    },
"remote":{
    "host": "127.34.45.15",
    "port": 3306,
    "user": "root",
    "password": "root1234"
    }
}

class MySQL:
    def __init__(self, host, port, user, password, database = None):
        self.host = host
        self.port =  port
        self.user =  user
        self.password = password
        self.database = database
    
    def connect(self):
        conn = mysql.connector.Connect(
            host = self.host,
            port = self.port,
            user = self.user,
            password = self.password,
            database = self.database
        )
        return conn
    
    def read_query(self, query):
        conn = self.connect()
        df = pd.read_sql(query, conn)
        #################################################################
        conn.close()
        #################################################################
        return df


query_1 = """
SELECT 
    emp_no as Emp_Id,
    concat(first_name," ",last_name),
    gender,
    hire_date
FROM 
    employees;
"""

employees = MySQL(**db_config["local"], database = "employees")

df = employees.read_query(query_1)

df.rename(
    columns={
        'concat(first_name," ",last_name)': "Name",
        "gender": "Gender",
        "hire_date":"Hire_Date"
        },
        #Using Inplace = True to modify current dataframe instead of creating new object.
        inplace=True
        )

print(df.head(5))
print(df.tail(5))

  df = pd.read_sql(query, conn)


   Emp_Id               Name Gender   Hire_Date
0   10001     Georgi Facello      M  1986-06-26
1   10002     Bezalel Simmel      F  1985-11-21
2   10003      Parto Bamford      M  1986-08-28
3   10004  Chirstian Koblick      M  1986-12-01
4   10005   Kyoichi Maliniak      M  1989-09-12
        Emp_Id              Name Gender   Hire_Date
300019  499995   Dekang Lichtner      F  1993-01-12
300020  499996         Zito Baaz      M  1990-09-27
300021  499997    Berhard Lenart      M  1986-04-21
300022  499998  Patricia Breugel      M  1993-10-13
300023  499999    Sachin Tsukuda      M  1997-11-30


In [1]:
"""
Improvement 3-
1. Instead of manually closing connections, built context manager.
As soon as with statement runs, connection to DB is established.
On the exit of block, connection gets closed.
"""

import mysql.connector
import pandas as pd

#  Created dictionary of fixed parameters. And we will unpack it in function call.

db_config = {
"local":{
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "password": "root"
    },
"remote":{
    "host": "127.34.45.15",
    "port": 3306,
    "user": "root",
    "password": "root1234"
    }
}

class MySQL:
    def __init__(self, host, port, user, password, database = None):
        self.host = host
        self.port =  port
        self.user =  user
        self.password = password
        self.database = database
    
    def conn(self):
        try:
            self.conn = mysql.connector.Connect(
                host = self.host,
                port = self.port,
                user = self.user,
                password = self.password,
                database = self.database
            )
            print("Connection established.")
        except Exception as e:
            print("Exception happend while establishing connection: ", e)
            raise
        return self.conn

    def __enter__(self):
        self.conn()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type:
            print("Exception happened: ", exc_value)
        if self.conn:
            self.conn.close()
            print("Connection Closed")
        return True
    
    def read_query(self, query):
        df = pd.read_sql(query, self.conn)
        return df


query_1 = """
SELECT 
    emp_no as Emp_Id,
    concat(first_name," ",last_name),
    gender,
    hire_date
FROM 
    employees;
"""

with MySQL(**db_config["local"], database = "employees") as employees:
    df = employees.read_query(query_1)

df.rename(columns={'concat(first_name," ",last_name)': 'Name'}, inplace=True)
print(df.head(5))
print(df.tail(5))

Connection established.


  df = pd.read_sql(query, self.conn)


Connection Closed
   Emp_Id               Name gender   hire_date
0   10001     Georgi Facello      M  1986-06-26
1   10002     Bezalel Simmel      F  1985-11-21
2   10003      Parto Bamford      M  1986-08-28
3   10004  Chirstian Koblick      M  1986-12-01
4   10005   Kyoichi Maliniak      M  1989-09-12
        Emp_Id              Name gender   hire_date
300019  499995   Dekang Lichtner      F  1993-01-12
300020  499996         Zito Baaz      M  1990-09-27
300021  499997    Berhard Lenart      M  1986-04-21
300022  499998  Patricia Breugel      M  1993-10-13
300023  499999    Sachin Tsukuda      M  1997-11-30


In [93]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 300024 entries, 0 to 300023
Data columns (total 4 columns):
 #   Column     Non-Null Count   Dtype 
---  ------     --------------   ----- 
 0   Emp_Id     300024 non-null  int64 
 1   Name       300024 non-null  object
 2   gender     300024 non-null  object
 3   hire_date  300024 non-null  object
dtypes: int64(1), object(3)
memory usage: 9.2+ MB


In [120]:
#-- List last 5 employees alphabetically in data .

with MySQL(**db_config["local"], database = "employees") as employees:
    df = employees.read_query(query_1)

df.rename(columns={'concat(first_name," ",last_name)': 'Name'}, inplace=True)
df_name = df['Name']
df_name.sort_values().tail(5)

Connection established.


  df = pd.read_sql(query, self.conn)


Connection Closed


24869      Zvonko Yoshizawa
130730         Zvonko Yurov
99488     Zvonko Zambonelli
182120         Zvonko Zobel
147573       Zvonko Zuberek
Name: Name, dtype: object

In [121]:
#-- List last 5 employees in data.

df_name.tail(5)

300019     Dekang Lichtner
300020           Zito Baaz
300021      Berhard Lenart
300022    Patricia Breugel
300023      Sachin Tsukuda
Name: Name, dtype: object

In [149]:
#-- List last 5 employees and their salary in data using JOIN.
query_2 = """
SELECT DISTINCT
	e.emp_no,
	concat(e.first_name, " ", e.last_name) as Name,
    s.salary
FROM
	employees e
JOIN
	salaries s
    ON e.emp_no = s.emp_no
ORDER BY
	e.emp_no DESC
"""
with MySQL(**db_config["local"], database = "employees") as employees:
    df = employees.read_query(query_2)

Connection established.


  df = pd.read_sql(query, self.conn)


Connection Closed


In [150]:
df_highest_salary = df.loc[df.groupby('emp_no')['salary'].idxmax()]
df_highest_salary.sort_values(by='emp_no', inplace=True)
df_highest_salary.tail(5)

Unnamed: 0,emp_no,Name,salary
2843416,499995,Dekang Lichtner,52868
2843423,499996,Zito Baaz,69501
2843438,499997,Berhard Lenart,83441
2843447,499998,Patricia Breugel,55003
2843452,499999,Sachin Tsukuda,77303


In [None]:
query_2 = """
SELECT DISTINCT
	e.emp_no,
	concat(e.first_name, " ", e.last_name) as Name,
    s.salary
FROM
	employees e
JOIN
	salaries s
    ON e.emp_no = s.emp_no
"""
with MySQL(**db_config["local"], database = "employees") as employees:
    df = employees.read_query(query_2)
df.loc[df.groupby('emp_no')['salary'].idxmax()]

In [152]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.5.tar.gz (317.2 MB)
     ---------------------------------------- 0.0/317.2 MB ? eta -:--:--
     ---------------------------------------- 1.3/317.2 MB 8.1 MB/s eta 0:00:40
     ---------------------------------------- 3.4/317.2 MB 9.1 MB/s eta 0:00:35
      --------------------------------------- 5.5/317.2 MB 9.5 MB/s eta 0:00:33
      --------------------------------------- 7.6/317.2 MB 9.6 MB/s eta 0:00:33
     - -------------------------------------- 9.4/317.2 MB 9.6 MB/s eta 0:00:32
     - ------------------------------------- 11.5/317.2 MB 9.6 MB/s eta 0:00:32
     - ------------------------------------- 13.9/317.2 MB 9.7 MB/s eta 0:00:32
     - ------------------------------------- 16.0/317.2 MB 9.7 MB/s eta 0:00:32
     -- ------------------------------------ 17.8/317.2 MB 9.7 MB/s eta 0:00:31
     -- ------------------------------------ 19.9/317.2 MB 9.6 MB/s eta 0:00:31
     -- ------------------------------------ 21.8/317.2 MB 9.7

In [3]:
import pyspark
pyspark.__version__

'3.5.5'

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MySQL Example") \
    .config("spark.jars", r"C:\mysql-connector-j-9.2.0\mysql-connector-j-9.2.0.jar") \
    .getOrCreate()

df = spark.read.format("jdbc").options(
    url="jdbc:mysql://127.0.0.1:3306/employees",
    driver="com.mysql.cj.jdbc.Driver",
    dbtable="salaries",
    user="root",
    password="root123"
).load()

df.show(5)


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [10]:
string = "Python, OOPs, Custom Exceptions, ORM, SQL, MySQL, Power BI, Pandas, Numpy, Matplotlib, Kubernetes, Docker, Redis Queue, Redis Pub/Sub, Git, GitLab, Bash, AWS Lambda, Azure Data Factory, Databricks, Agile Delivery"


string = string.replace(", ", " | ")
string

'Python | OOPs | Custom Exceptions | ORM | SQL | MySQL | Power BI | Pandas | Numpy | Matplotlib | Kubernetes | Docker | Redis Queue | Redis Pub/Sub | Git | GitLab | Bash | AWS Lambda | Azure Data Factory | Databricks | Agile Delivery'