In [1]:
import os
import time

import findspark
findspark.init()
findspark.find()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext, Row
import numpy as np

num_executors = 1
memory = "100g"
pyspark_submit_args = '--driver-memory ' + memory + ' pyspark-shell ' + '--num-executors ' + str(num_executors)
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [2]:
conf = SparkConf().setAppName("myApp").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [3]:
df_table1 = sc.textFile("hdfs://localhost:9000/mydata/Table1.csv")
df_table2 = sc.textFile("hdfs://localhost:9000/mydata/Table2.csv")

In [4]:
df_table1.first()

'INFO,2017-03-22T20:11:49+00:00,31,ghtorrent,NULL,https://api.github.com/repos/tgstation/tgstation,NULL'

In [5]:
df_table2.first()

'60256709,https://api.github.com/repos/1010026218/Basic,34433644,Basic,Java,2017-03-21 07:57:59,35370568,0,1970-01-02 00:00:00'

In [6]:
parts1 = df_table1.map(lambda l: l.split(","))
parts2 = df_table2.map(lambda l: l.split(","))

table1 = parts1.map(lambda p: Row( \
    logging_level=str(p[0]), \
    timestamp=str(p[1]), \
    downloader_id=str(p[2]), \
    retrieval_stage=str(p[3]), \
    request_status=str(p[4]), \
    url=str(p[5]), \
    access_key=str(p[6])))

# id,url,owner_id,name,language,created_at,forked_from,deleted,updated_at
table2 = parts2.map(lambda p: Row( \
    id=str(p[0]), \
    url=str(p[1]), \
    owner_id=str(p[2]), \
    name=str(p[3]), \
    language=str(p[4]), \
    created_at=str(p[5]), \
    forked_from=str(p[6]), \
    deleted=str(p[7]), \
    updated_at=str(p[8])))

In [7]:
print("Table1")
schemaTable1 = sqlContext.createDataFrame(table1)
schemaTable1.printSchema()
schemaTable1.createOrReplaceTempView("GHTORRENT_RECORDS")

print("Table2")
schemaTable2 = sqlContext.createDataFrame(table2)
schemaTable2.printSchema()
schemaTable2.createOrReplaceTempView("IMPORTANT_REPOS")

Table1
root
 |-- access_key: string (nullable = true)
 |-- downloader_id: string (nullable = true)
 |-- logging_level: string (nullable = true)
 |-- request_status: string (nullable = true)
 |-- retrieval_stage: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- url: string (nullable = true)

Table2
root
 |-- created_at: string (nullable = true)
 |-- deleted: string (nullable = true)
 |-- forked_from: string (nullable = true)
 |-- id: string (nullable = true)
 |-- language: string (nullable = true)
 |-- name: string (nullable = true)
 |-- owner_id: string (nullable = true)
 |-- updated_at: string (nullable = true)
 |-- url: string (nullable = true)



In [8]:
result = sqlContext.sql("Select * from GHTORRENT_RECORDS limit 10")

In [9]:
result.show()

+----------+-------------+-------------+--------------+---------------+--------------------+--------------------+
|access_key|downloader_id|logging_level|request_status|retrieval_stage|           timestamp|                 url|
+----------+-------------+-------------+--------------+---------------+--------------------+--------------------+
|      NULL|           31|         INFO|          NULL|      ghtorrent|2017-03-22T20:11:...|https://api.githu...|
|      NULL|           30|        DEBUG|          NULL|      retriever|2017-03-23T11:15:...|https://api.githu...|
|      NULL|           35|        DEBUG|          NULL|      ghtorrent|2017-03-22T20:15:...|                NULL|
|      NULL|           49|        DEBUG|          NULL|      ghtorrent|2017-03-24T12:29:...|                NULL|
|      NULL|            8|        DEBUG|          NULL|      retriever|2017-03-23T09:00:...|https://api.githu...|
|      NULL|           50|        DEBUG|          NULL|      ghtorrent|2017-03-24T10:52:

## Queries

In [10]:
query2 = "SELECT COUNT(*) FROM GHTORRENT_RECORDS"
result2 = sqlContext.sql(query2)
print("How many records does the table contain?") 
result2.show()

How many records does the table contain?
+--------+
|count(1)|
+--------+
| 9669634|
+--------+



In [11]:
query3 = "SELECT COUNT(*) FROM GHTORRENT_RECORDS WHERE LOGGING_LEVEL='WARN'"
result3 = sqlContext.sql(query3)
print("Count the number of WARNing messages.")
result3.show()

+--------+
|count(1)|
+--------+
|  132158|
+--------+



In [12]:
# 78588
query4 = '''SELECT COUNT(DISTINCT URL) 
               FROM GHTORRENT_RECORDS 
               WHERE RETRIEVAL_STAGE = 'api_client' AND URL != 'NULL' '''
result4 = sqlContext.sql(query4)
print("How many repositories were processed in total?")
result4.show()

How many repositories were processed in total?
+-------------------+
|count(DISTINCT URL)|
+-------------------+
|              78369|
+-------------------+



In [13]:
query5 = """SELECT COUNT(RETRIEVAL_STAGE), DOWNLOADER_ID
            FROM GHTORRENT_RECORDS 
            WHERE RETRIEVAL_STAGE='api_client' AND URL != 'NULL'
            GROUP BY DOWNLOADER_ID
            ORDER BY COUNT(RETRIEVAL_STAGE) DESC 
            LIMIT 10"""
result5 = sqlContext.sql(query5)
result5.show()

+----------------------+-------------+
|count(RETRIEVAL_STAGE)|DOWNLOADER_ID|
+----------------------+-------------+
|                 85528|           13|
|                 19046|            4|
|                 18948|           18|
|                 18926|           10|
|                 18911|           40|
|                 18616|           39|
|                 18614|           38|
|                 18604|           47|
|                 18463|            1|
|                 18452|           24|
+----------------------+-------------+



In [14]:
# [(79623, '13'), (1378, '21'), (1134, '40'), (368, '18'), (357, '42'), (356, '9'), (352, '4'), (342, '25'), (333, '22'), (332, '6')]
query6 = """SELECT COUNT(RETRIEVAL_STAGE), DOWNLOADER_ID
            FROM GHTORRENT_RECORDS 
            WHERE RETRIEVAL_STAGE='api_client' AND REQUEST_STATUS='Failed'
            GROUP BY DOWNLOADER_ID
            ORDER BY COUNT(RETRIEVAL_STAGE) DESC 
            LIMIT 10"""
result6 = sqlContext.sql(query6)
result6.show()

+----------------------+-------------+
|count(RETRIEVAL_STAGE)|DOWNLOADER_ID|
+----------------------+-------------+
|                 79623|           13|
|                  1378|           21|
|                  1134|           40|
|                   368|           18|
|                   357|           42|
|                   356|            9|
|                   352|            4|
|                   342|           25|
|                   333|           22|
|                   332|            6|
+----------------------+-------------+



In [15]:
# [(2662487, '10')]
query7 = """
            SELECT NEWT.TS
            FROM (SELECT LOGGING_LEVEL, SUBSTRING(TIMESTAMP, 12, 2) AS TS, DOWNLOADER_ID, RETRIEVAL_STAGE, REQUEST_STATUS, URL, ACCESS_KEY
                  FROM GHTORRENT_RECORDS) AS NEWT
            GROUP BY NEWT.TS
            ORDER BY COUNT(NEWT.TS) DESC
            LIMIT 1
            """
result7 = sqlContext.sql(query7)
result7.show()

+---+
| TS|
+---+
| 10|
+---+



In [16]:
# [('https://api.github.com/repos/greatfakeman/Tabchi', 79539)]
query8 = '''
        SELECT  URL
        FROM GHTORRENT_RECORDS
        WHERE URL != 'NULL'
        GROUP BY URL
        ORDER BY COUNT(URL) DESC
        LIMIT 1
        '''
result8 = sqlContext.sql(query8)
result8.show()

+--------------------+
|                 URL|
+--------------------+
|https://api.githu...|
+--------------------+



In [17]:
# [('ac6168f8776', 79623), ('46f11b5791b', 1340), ('9115020fb01', 1134), ('c1240f63b5b', 371), ('2776f3ba0a5', 368)]
query9 = '''
        SELECT  ACCESS_KEY, COUNT(ACCESS_KEY) AS key_count
        FROM GHTORRENT_RECORDS
        WHERE ACCESS_KEY != 'NULL'
        GROUP BY ACCESS_KEY
        ORDER BY key_count DESC
        LIMIT 5
        '''
result9 = sqlContext.sql(query9)
result9.show()

+-----------+---------+
| ACCESS_KEY|key_count|
+-----------+---------+
|ac6168f8776|    79623|
|46f11b5791b|     1340|
|9115020fb01|     1134|
|c1240f63b5b|      371|
|2776f3ba0a5|      368|
+-----------+---------+



### Table 2 Queries


In [18]:
query10 = "SELECT COUNT(*) FROM IMPORTANT_REPOS"
result10 = sqlContext.sql(query10)
print("How many records does the IMPORTANT_REPOS table contain?")
result10.show()

How many records does the IMPORTANT_REPOS table contain?
+--------+
|count(1)|
+--------+
|    1435|
+--------+



In [19]:
query11 = '''
            SELECT COUNT(ir.URL)
            FROM GHTORRENT_RECORDS gt
            INNER JOIN IMPORTANT_REPOS ir
                ON gt.URL != 'NULL' AND gt.URL=ir.URL
            '''
result11 = sqlContext.sql(query11)
print("How many records in the log file refer to entries in the interesting file?")
result11.show()

How many records in the log file refer to entries in the interesting file?


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 51040)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1188, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1014, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1193, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
  File "/usr/lib/python3.6/socketserver.py", line 320, in _handle_request_nobl

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:45597)
Traceback (most recent call last):
  File "/home/kvats/.local/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-19-c8e20c7fd450>", line 9, in <module>
    result11.show()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 407, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/opt/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/spark/python/pyspark/sql/utils.py", line 98, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 336, in get_return_value
    format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o113.showString

During handling of the above exceptio

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:45597)
Traceback (most recent call last):
  File "/home/kvats/.local/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-19-c8e20c7fd450>", line 9, in <module>
    result11.show()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 407, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/opt/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/spark/python/pyspark/sql/utils.py", line 98, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 336, in get_return_value
    format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o113.showString

During handling of the above exceptio

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:45597)
Traceback (most recent call last):
  File "/home/kvats/.local/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-19-c8e20c7fd450>", line 9, in <module>
    result11.show()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 407, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/opt/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/spark/python/pyspark/sql/utils.py", line 98, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 336, in get_return_value
    format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o113.showString

During handling of the above exceptio

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:45597)
Traceback (most recent call last):
  File "/home/kvats/.local/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-19-c8e20c7fd450>", line 9, in <module>
    result11.show()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 407, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/opt/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/spark/python/pyspark/sql/utils.py", line 98, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 336, in get_return_value
    format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o113.showString

During handling of the above exceptio

Py4JError: An error occurred while calling o113.showString

In [None]:
query12 = '''
            SELECT ir.URL, COUNT(ir.URL)
            FROM GHTORRENT_RECORDS gt
            INNER JOIN IMPORTANT_REPOS ir
            ON gt.URL != 'NULL' AND gt.URL=ir.URL AND gt.REQUEST_STATUS = 'Failed' 
            GROUP BY ir.URL
            ORDER BY COUNT(ir.URL) DESC
            LIMIT 5
            '''
result12 = sqlContext.sql(query12)
print("Which of the interesting repositories has the most failed API calls?")
result12.show()
# print("TimeTaken: {} ms".format(end_time-start_time))