In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-class-path /usr/local/sqoop/lib/mysql-connector-java-5.1.47-bin.jar \
--jars /usr/local/sqoop/lib/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47-bin.jar pyspark-shell'

In [2]:
import findspark
findspark.init('/usr/local/spark/')

In [3]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder\
                    .appName('MySQL RW').master('local[*]')\
                    .getOrCreate()

In [8]:
empDf = spark.read.format('jdbc')\
            .option('url','jdbc:mysql://localhost:3306/saif_db?useSSL=False')\
            .option('driver','com.mysql.jdbc.Driver')\
            .option('user','root')\
            .option('password','root')\
            .option('dbtable','emp')\
            .load()
empDf.show(truncate=False)

+-----+------+---------+----+----------+-------+-------+------+
|empno|ename |job      |mgr |hiredate  |sal    |comm   |deptno|
+-----+------+---------+----+----------+-------+-------+------+
|7369 |SMITH |CLERK    |7902|1980-12-17|800.00 |null   |20    |
|7499 |ALLEN |SALESMAN |7698|1981-02-20|1600.00|300.00 |30    |
|7521 |WARD  |SALESMAN |7698|1981-02-22|1250.00|500.00 |30    |
|7566 |JONES |MANAGER  |7839|1981-04-02|2975.00|null   |20    |
|7654 |MARTIN|SALESMAN |7698|1981-09-28|1250.00|1400.00|30    |
|7698 |BLAKE |MANAGER  |7839|1981-05-01|2850.00|null   |30    |
|7782 |CLARK |MANAGER  |7839|1981-06-09|2450.00|null   |10    |
|7788 |SCOTT |ANALYST  |7566|1982-12-09|3000.00|null   |20    |
|7839 |KING  |PRESIDENT|null|1981-11-17|5000.00|null   |10    |
|7844 |TURNER|SALESMAN |7698|1981-09-08|1500.00|0.00   |30    |
|7876 |ADAMS |CLERK    |7788|1983-01-12|1100.00|null   |20    |
|7900 |JAMES |CLERK    |7698|1981-12-03|950.00 |null   |30    |
|7902 |FORD  |ANALYST  |7566|1981-12-03|

In [10]:
# Spark SQL:
# We need to convert DF to Temp Table:

empDf.createOrReplaceTempView('emp')
spark.sql('select * from emp where deptno in (10,20)').show(5)

+-----+-----+---------+----+----------+-------+----+------+
|empno|ename|      job| mgr|  hiredate|    sal|comm|deptno|
+-----+-----+---------+----+----------+-------+----+------+
| 7369|SMITH|    CLERK|7902|1980-12-17| 800.00|null|    20|
| 7566|JONES|  MANAGER|7839|1981-04-02|2975.00|null|    20|
| 7782|CLARK|  MANAGER|7839|1981-06-09|2450.00|null|    10|
| 7788|SCOTT|  ANALYST|7566|1982-12-09|3000.00|null|    20|
| 7839| KING|PRESIDENT|null|1981-11-17|5000.00|null|    10|
+-----+-----+---------+----+----------+-------+----+------+
only showing top 5 rows



In [11]:
resDf = spark.sql("""
                select 
                    a.empno as emp_empno,
                    a.ename as emp_ename,
                    b.empno as mgr_empno,
                    b.ename as mgr_ename,
                    c.emp_cnt as mgr_team_cnt
                from emp a, emp b, (select 
                                    a.mgr, count(a.ename) as emp_cnt
                                    from emp a, emp b
                                    where a.mgr = b.empno
                                    group by a.mgr) c
                where a.mgr = b.empno
                and b.empno = c.mgr
                order by b.empno
                """)
resDf.show()

+---------+---------+---------+---------+------------+
|emp_empno|emp_ename|mgr_empno|mgr_ename|mgr_team_cnt|
+---------+---------+---------+---------+------------+
|     7902|     FORD|     7566|    JONES|           2|
|     7788|    SCOTT|     7566|    JONES|           2|
|     7499|    ALLEN|     7698|    BLAKE|           5|
|     7521|     WARD|     7698|    BLAKE|           5|
|     7654|   MARTIN|     7698|    BLAKE|           5|
|     7844|   TURNER|     7698|    BLAKE|           5|
|     7900|    JAMES|     7698|    BLAKE|           5|
|     7934|   MILLER|     7782|    CLARK|           1|
|     7876|    ADAMS|     7788|    SCOTT|           1|
|     7698|    BLAKE|     7839|     KING|           3|
|     7566|    JONES|     7839|     KING|           3|
|     7782|    CLARK|     7839|     KING|           3|
|     7369|    SMITH|     7902|     FORD|           1|
+---------+---------+---------+---------+------------+



In [12]:
resDf.write.mode('overwrite').format('jdbc')\
    .option('url','jdbc:mysql://localhost:3306/saif_db?useSSL=False')\
    .option('user','root')\
    .option('password','root')\
    .option('driver','com.mysql.jdbc.Driver')\
    .option('dbtable','emp_mgr_cnt')\
    .save()