In [1]:
# importando as bibliotecas necessarias
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType

In [2]:
# criando a sessão do spark
spSession = SparkSession.builder.master("local").appName("casePasseiDireto").getOrCreate()

In [3]:
# criando o Sql Context
sqlContext = SQLContext(sc)

In [6]:
# carregando o arquivo
courses = spSession.read.json("Datasets/BASE A/courses.json")

In [7]:
courses.show()

+-------+--------------------+
|     Id|                Name|
+-------+--------------------+
|1199555| Engenharia Elétrica|
|1199521|Economia / Ciênci...|
|1199517|             Direito|
|1199491| Ciências Ambientais|
|1199573|  Engenharia Química|
|1199553|Engenharia de Pro...|
|1199536|Engenharia Ambiental|
|1199725|             Química|
|1199453|       Administração|
|1199701|         Odontologia|
|1199461|           Agronomia|
|1199741|      Serviço Social|
|1199532|Engenharia Aeroná...|
|1199544|    Engenharia Civil|
|1199724|Comunicação Socia...|
|1199704|           Pedagogia|
|1199699|            Nutrição|
|6495411|      Gestão Pública|
|1199687|            Medicina|
|1199734|Relações Internac...|
+-------+--------------------+
only showing top 20 rows



In [8]:
courses.printSchema()

root
 |-- Id: long (nullable = true)
 |-- Name: string (nullable = true)



In [9]:
courses.registerTempTable("courses")

In [10]:
spSession.sql("""
    SELECT
        NAME, 
        COUNT(*)
    FROM
        courses
    GROUP
        BY NAME
    HAVING COUNT(*) > 1
""").show()

+----+--------+
|NAME|count(1)|
+----+--------+
+----+--------+



In [11]:
# carregando o arquivo
sessions = spSession.read.json("Datasets/BASE A/sessions.json")

In [12]:
sessions.show()

+-------------------+-------------+--------------------+
|   SessionStartTime|StudentClient|           StudentId|
+-------------------+-------------+--------------------+
|2017-11-18 15:47:33|      Website|0cade9bf00234e378...|
|2017-11-20 22:21:13|      Website|0cade9bf00234e378...|
|2017-11-20 22:35:31|      Website|0cade9bf00234e378...|
|2017-11-20 23:35:46|      Website|0cade9bf00234e378...|
|2017-11-23 21:24:00|      Website|0cade9bf00234e378...|
|2017-11-24 02:08:08|      Website|0cade9bf00234e378...|
|2017-11-13 13:12:49|      Website|8a501cab6c0a5a7e9...|
|2017-11-06 21:34:45|      Website|b8a39150d98d74685...|
|2017-11-20 15:26:09|      Website|b8a39150d98d74685...|
|2017-11-20 21:40:32|      Website|b8a39150d98d74685...|
|2017-11-28 21:40:03|      Website|b8a39150d98d74685...|
|2017-11-02 11:04:17|      Website|0f6c90f966a70b84b...|
|2017-11-03 14:19:56|      Website|0f6c90f966a70b84b...|
|2017-11-06 11:59:36|      Website|0f6c90f966a70b84b...|
|2017-11-09 15:09:56|      Webs

In [13]:
sessions.printSchema()

root
 |-- SessionStartTime: string (nullable = true)
 |-- StudentClient: string (nullable = true)
 |-- StudentId: string (nullable = true)



In [14]:
sessions.registerTempTable("sessions")

In [15]:
spSession.sql("""
    SELECT
        *
    FROM
        sessions
    WHERE
        StudentId IS NULL
""").show()

+----------------+-------------+---------+
|SessionStartTime|StudentClient|StudentId|
+----------------+-------------+---------+
+----------------+-------------+---------+



In [16]:
spSession.sql("""
    SELECT
        StudentClient, 
        COUNT(*)
    FROM
        sessions
    GROUP
        BY StudentClient
    HAVING COUNT(*) > 1
""").show()

+-------------+--------+
|StudentClient|count(1)|
+-------------+--------+
|       Webapp|   13299|
|          iOS|   34798|
|      Website|  158674|
|      Android|   77170|
+-------------+--------+



In [17]:
# carregando o arquivo
student_follow_subject = spSession.read.json("Datasets/BASE A/student_follow_subject.json")

In [18]:
student_follow_subject.show()

+--------------------+--------------------+---------+
|          FollowDate|           StudentId|SubjectId|
+--------------------+--------------------+---------+
|2015-09-07 15:49:...|0cade9bf00234e378...|   682889|
|2015-09-07 15:49:...|0cade9bf00234e378...|   684377|
|2015-09-07 15:49:...|0cade9bf00234e378...|   698583|
|2015-09-07 15:49:...|0cade9bf00234e378...|   757882|
|2015-09-07 15:49:...|0cade9bf00234e378...|  1910475|
|2015-12-07 16:17:...|8a501cab6c0a5a7e9...|   673956|
|2015-12-07 16:17:...|8a501cab6c0a5a7e9...|   674245|
|2015-12-07 16:17:...|8a501cab6c0a5a7e9...|   681478|
|2015-12-07 16:17:...|8a501cab6c0a5a7e9...|   681925|
|2015-12-07 16:17:...|8a501cab6c0a5a7e9...|   693487|
|2016-05-30 10:31:...|b8a39150d98d74685...|   670492|
|2016-05-30 10:31:...|b8a39150d98d74685...|   671126|
|2016-05-30 10:31:...|b8a39150d98d74685...|   686806|
|2016-05-30 10:31:...|b8a39150d98d74685...|   689481|
|2016-05-30 10:31:...|b8a39150d98d74685...|  1663821|
|2017-11-03 12:23:...|0f6c90

In [19]:
student_follow_subject.printSchema()

root
 |-- FollowDate: string (nullable = true)
 |-- StudentId: string (nullable = true)
 |-- SubjectId: long (nullable = true)



In [20]:
student_follow_subject.registerTempTable("student_follow_subject")

In [21]:
spSession.sql("""
    SELECT
        *
    FROM
        student_follow_subject
    WHERE
        StudentId IS NULL
    OR
        SubjectId IS NULL
""").show()

+----------+---------+---------+
|FollowDate|StudentId|SubjectId|
+----------+---------+---------+
+----------+---------+---------+



In [82]:
# carregando o arquivo
students = spSession.read.json("Datasets/BASE A/students.json")

In [83]:
students.show()

+----------------+--------+--------------------+--------------------+------------+----------------+-------------+------------+
|            City|CourseId|                  Id|      RegisteredDate|SignupSource|           State|StudentClient|UniversityId|
+----------------+--------+--------------------+--------------------+------------+----------------+-------------+------------+
|  Rio de Janeiro| 1199555|0cade9bf00234e378...|2012-09-06 17:27:...|    Facebook|  Rio de Janeiro|         null|      664704|
|  Rio de Janeiro| 1199521|8a501cab6c0a5a7e9...|2012-09-05 15:31:...|    Facebook|  Rio de Janeiro|         null|      664704|
|            null| 1199517|b8a39150d98d74685...|2012-08-17 10:00:...|    Facebook|            null|         null|      661625|
|  Rio de Janeiro| 1199491|0f6c90f966a70b84b...|2012-10-22 21:16:...|    Facebook|  Rio de Janeiro|         null|      664768|
|            null| 1199573|388bf84cffb321938...|2012-10-24 01:48:...|    Facebook|    Minas Gerais|         nul

In [84]:
students.printSchema()

root
 |-- City: string (nullable = true)
 |-- CourseId: long (nullable = true)
 |-- Id: string (nullable = true)
 |-- RegisteredDate: string (nullable = true)
 |-- SignupSource: string (nullable = true)
 |-- State: string (nullable = true)
 |-- StudentClient: string (nullable = true)
 |-- UniversityId: long (nullable = true)



In [85]:
students.registerTempTable("students")

In [26]:
spSession.sql("""
    SELECT
        CITY
    FROM
        students
    WHERE
        ID IS NULL 
    OR
        CourseId IS NULL
""").show()

+----+--------+---+--------------+------------+-----+-------------+------------+
|City|CourseId| Id|RegisteredDate|SignupSource|State|StudentClient|UniversityId|
+----+--------+---+--------------+------------+-----+-------------+------------+
+----+--------+---+--------------+------------+-----+-------------+------------+



In [27]:
# carregando o arquivo
subjects = spSession.read.json("Datasets/BASE A/subjects.json")

In [28]:
subjects.show()

+-------+--------------------+
|     Id|                Name|
+-------+--------------------+
| 682889|       Eletrônica II|
| 684377|Circuitos Elétric...|
| 698583|          Eletrônica|
| 757882|        Circuitos II|
|1910475|Circuitos Elétric...|
| 673956|Teoria Microeconô...|
| 674245|       Econometria I|
| 681478|     Microeconomia 2|
| 681925|     Microeconomia 1|
| 693487|Métodos Quantitat...|
| 670492|Direito Administr...|
| 671126|Direito Constituc...|
| 686806|             Direito|
| 689481|     Direito Civil I|
|1663821|Extras (livros, C...|
| 670937|   Direito Ambiental|
| 671197|         Estatística|
| 673846|            Ecologia|
| 674226|    Geoprocessamento|
| 695790|Sistemas de Infor...|
+-------+--------------------+
only showing top 20 rows



In [29]:
subjects.printSchema()

root
 |-- Id: long (nullable = true)
 |-- Name: string (nullable = true)



In [30]:
subjects.registerTempTable("subjects")

In [31]:
spSession.sql("""
    SELECT
        *
    FROM
        subjects
    WHERE
        ID IS NULL 
""").show()

+---+----+
| Id|Name|
+---+----+
+---+----+



In [32]:
# carregando o arquivo
subscriptions = spSession.read.json("Datasets/BASE A/subscriptions.json")

In [33]:
subscriptions.show()

+--------------------+--------+--------------------+
|         PaymentDate|PlanType|           StudentId|
+--------------------+--------+--------------------+
|2017-11-14 19:52:...|  Mensal|29037b0a52c5b576d...|
|2017-11-08 11:52:...|  Mensal|b2bace77d15c3dfaf...|
|2017-11-05 21:27:...|  Mensal|f423d6fe2f8964db6...|
|2017-11-15 14:36:...|  Mensal|55ccbe518d2edbbd5...|
|2017-11-12 22:19:...|  Mensal|b1b0f63fe3e4820cb...|
|2017-11-22 01:03:...|  Mensal|ed46832f6b716fb2e...|
|2017-11-24 19:03:...|  Mensal|69b7bee32821cf76b...|
|2017-11-11 21:01:...|  Mensal|6553923125fe6364e...|
|2017-11-12 16:41:...|  Mensal|3903a334d1af8ce83...|
|2017-11-21 11:52:...|  Mensal|4487f81a4ea9b3c3c...|
|2017-11-06 22:14:...|  Mensal|bde8436a92ab53ce2...|
|2017-11-17 19:30:...|  Mensal|3ae094228e1b9324e...|
|2017-11-10 22:33:...|  Mensal|5a0df378119b95d3c...|
|2017-11-08 22:06:...|  Mensal|fed3de568756e019f...|
|2017-11-01 18:14:...|  Mensal|68beec3e289e3316e...|
|2017-11-10 15:22:...|  Mensal|ecb4e29cd9585b0

In [34]:
subscriptions.printSchema()

root
 |-- PaymentDate: string (nullable = true)
 |-- PlanType: string (nullable = true)
 |-- StudentId: string (nullable = true)



In [35]:
subscriptions.registerTempTable("subscriptions")

In [36]:
spSession.sql("""
    SELECT
        *
    FROM
        subscriptions
    WHERE
        StudentId IS NULL 
""").show()

+-----------+--------+---------+
|PaymentDate|PlanType|StudentId|
+-----------+--------+---------+
+-----------+--------+---------+



In [37]:
spSession.sql("""
    SELECT
        PLANTYPE, 
        COUNT(*)
    FROM
        SUBSCRIPTIONS
    GROUP
        BY PLANTYPE
""").show()

+--------+--------+
|PLANTYPE|count(1)|
+--------+--------+
|  Mensal|     739|
|   Anual|      60|
+--------+--------+



In [40]:
# carregando arquivo
universities = spSession.read.json("Datasets/BASE A/universities.json")

In [42]:
universities.show()

+------+-----------+
|    Id|       Name|
+------+-----------+
|664704|       UERJ|
|661625|    PUC-RIO|
|664768|     UNIRIO|
|663106|       UFSJ|
|664138|       UFSC|
|664742|       UFRJ|
|663609|     PUC-PR|
|663054|       UFOP|
|663065|        UFV|
|664565|ESTÁCIO EAD|
|665419|     UNIFOA|
|662926|        UNB|
|664240|     UFSCAR|
|663134|  PUC-MINAS|
|663075|        UFU|
|664623|    ESTÁCIO|
|665677|        UVV|
|662981|        UFG|
|663165|       UFMG|
|662944|       UFES|
+------+-----------+
only showing top 20 rows



In [43]:
universities.printSchema()

root
 |-- Id: long (nullable = true)
 |-- Name: string (nullable = true)



In [44]:
universities.registerTempTable("universities")

In [45]:
spSession.sql("""
    SELECT
        *
    FROM
        universities
    WHERE
        id IS NULL
""").show()

+---+----+
| Id|Name|
+---+----+
+---+----+



In [46]:
spSession.sql("""
    SELECT
        Name,
        COUNT(*)
    FROM
        universities
    GROUP
        BY Name
    HAVING
        COUNT(*) > 1
""").show()

+---------+--------+
|     Name|count(1)|
+---------+--------+
|     IFES|       3|
|   FAFIPA|       2|
|     FASF|       2|
|     IESB|       2|
|     IBES|       2|
|    UNORP|       2|
|     FAIT|       2|
|       FC|       3|
|    ITPAC|       5|
|      FAR|       4|
|     FAPI|       4|
|      FSM|       2|
|   UNIPAC|       4|
|      FIB|       3|
|    FAMEC|       2|
|  PROMOVE|       3|
|   FACSUL|       3|
|UNIBRATEC|       2|
|      FAA|       3|
|      FAL|       3|
+---------+--------+
only showing top 20 rows



In [62]:
# carregando arquivo
events = spSession.read.json("Datasets/BASE B/*.json")

In [63]:
events.show()

+-------------------+-------------+---------------+--------------------+---------------+--------------------+-------------------+----------------+--------------------+---------+---------+-------+-----------+--------------------+--------+-----------+----------+-------------------+--------------------+--------+-----------+--------------------+----------------+----------------+------------+---------+---+-------+-----------+------+--------------------+--------------------+----+---------+--------------------+
|  Last Accessed Url|Page Category|Page Category 1|     Page Category 2|Page Category 3|           Page Name|                 at|         browser|             carrier|city_name|clv_total|country|   custom_1|            custom_2|custom_3|   custom_4|device_new|first-accessed-page|        install_uuid|language|library_ver|  marketing_campaign|marketing_medium|marketing_source|       model|     name|nth| os_ver|   platform|region|        session_uuid|studentId_clientType|type|user_type|  

In [64]:
events = events.withColumnRenamed("Last Accessed Url", "Last_Accessed_Url") \
    .withColumnRenamed("Page Category", "Page_Category") \
    .withColumnRenamed("Page Category 1", "Page_Category_1") \
    .withColumnRenamed("Page Category 2", "Page_Category_2") \
    .withColumnRenamed("Page Category 3", "Page_Category_3") \
    .withColumnRenamed("Page Name", "Page_Name") \
    .withColumnRenamed("at", "event_at") \
    .withColumnRenamed("first-accessed-page", "first_accessed_page") \
    .withColumnRenamed("language", "event_language") \
    .withColumnRenamed("name", "event_name") \
    .withColumnRenamed("type", "event_type") \
    .withColumnRenamed("at", "event_at") \
    .withColumnRenamed("custom_1", "university") \
    .withColumnRenamed("custom_2", "course") \
    .withColumnRenamed("custom_4", "user_frequency")

In [65]:
events.printSchema()

root
 |-- Last_Accessed_Url: string (nullable = true)
 |-- Page_Category: string (nullable = true)
 |-- Page_Category_1: string (nullable = true)
 |-- Page_Category_2: string (nullable = true)
 |-- Page_Category_3: string (nullable = true)
 |-- Page_Name: string (nullable = true)
 |-- event_at: string (nullable = true)
 |-- browser: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- clv_total: long (nullable = true)
 |-- country: string (nullable = true)
 |-- university: string (nullable = true)
 |-- course: string (nullable = true)
 |-- custom_3: string (nullable = true)
 |-- user_frequency: string (nullable = true)
 |-- device_new: boolean (nullable = true)
 |-- first_accessed_page: string (nullable = true)
 |-- install_uuid: string (nullable = true)
 |-- event_language: string (nullable = true)
 |-- library_ver: string (nullable = true)
 |-- marketing_campaign: string (nullable = true)
 |-- marketing_medium: string (nullable

In [89]:
events_step1 = spSession.sql("""
    SELECT 
        EVENT_AT,
        TO_DATE(event_at)           AS EVENT_AT_DT,
        UPPER(REPLACE(LEFT(LAST_ACCESSED_URL, INSTR(LAST_ACCESSED_URL, '?') -1), '/disciplina/', '')) AS LAST_ACCESSED_URL,
        UPPER(PAGE_CATEGORY)        AS PAGE_CATEGORY,
        UPPER(REPLACE(LEFT(PAGE_NAME, INSTR(PAGE_NAME, '?') -1), '/disciplina/', '')) AS PAGE_NAME,
        UPPER(BROWSER)              AS BROWSER,
        UPPER(CARRIER)              AS CARRIER,
        COALESCE(CLV_TOTAL, 0)      AS CLV_TOTAL,
        UPPER(USER_FREQUENCY)       AS USER_FREQUENCY,
        DEVICE_NEW,
        EVENT_LANGUAGE,
        COALESCE(COALESCE(MARKETING_CAMPAIGN, MARKETING_MEDIUM), MARKETING_SOURCE) AS MARKETING,
        UPPER(SESSION_UUID)         AS SESSION_UUID,
        UPPER(LEFT(STUDENTID_CLIENTTYPE, INSTR(STUDENTID_CLIENTTYPE, '@') -1)) AS STUDENT_ID,
        UPPER(USER_TYPE)            AS USER_TYPE,
        UPPER(PLATFORM)             AS PLATFORM,
        UPPER(UUID)                 AS UUID
    FROM 
        EVENTS
    WHERE
        LAST_ACCESSED_URL IS NOT NULL
    """)

In [90]:
events_step1.printSchema()

root
 |-- EVENT_AT: string (nullable = true)
 |-- EVENT_AT_DT: date (nullable = true)
 |-- LAST_ACCESSED_URL: string (nullable = true)
 |-- PAGE_CATEGORY: string (nullable = true)
 |-- PAGE_NAME: string (nullable = true)
 |-- BROWSER: string (nullable = true)
 |-- CARRIER: string (nullable = true)
 |-- CLV_TOTAL: long (nullable = false)
 |-- USER_FREQUENCY: string (nullable = true)
 |-- DEVICE_NEW: boolean (nullable = true)
 |-- EVENT_LANGUAGE: string (nullable = true)
 |-- MARKETING: string (nullable = true)
 |-- SESSION_UUID: string (nullable = true)
 |-- STUDENT_ID: string (nullable = true)
 |-- USER_TYPE: string (nullable = true)
 |-- PLATFORM: string (nullable = true)
 |-- UUID: string (nullable = true)



In [91]:
events_step1.registerTempTable("events_step1")

In [107]:
studants_test = spSession.sql("""
    SELECT 
       DISTINCT A.ID AS STUDENT_ID
    FROM 
        STUDENTS AS A
    """)

In [108]:
MYSQL_JDBC_DRIVE = "com.mysql.jdbc.Driver"
MYSQL_HOST = "localhost"
MYSQL_DBNAME = "stage"
MYSQL_TABLE = "STG_STUDENT"
MYSQL_USERNAME = "root"
MYSQL_PASSWORD = "1990"

In [109]:
URL = f"jdbc:mysql://{MYSQL_HOST}/{MYSQL_DBNAME}"

In [110]:
studants_test.write.format("jdbc").options(
    url = URL,
    driver = MYSQL_JDBC_DRIVE,
    dbtable = MYSQL_TABLE,
    user = MYSQL_USERNAME,
    password = MYSQL_PASSWORD).mode('append').save()