# 5. Using PySpark, sample some Python and Non-Python questions from main repo

## 5.0 Mount drive and load repo
return vars - **q** & **o**


In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
import os
import sys
import time
start = time.time()

folderpath = '/content/gdrive/My Drive/iss/Capstone/PLP Project/iTechQnA'
os.chdir(folderpath)

In [3]:
!pip install pyspark

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 51.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=5efbd7cb90cbdeea79e60d762d117cb1f3cd917a69771b75cd2d650129fc9e82
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [4]:
%%time
q = spark.read.format("json").load('/content/gdrive/My Drive/iss/Capstone/PLP Project/Dataset/data/q*txt')
q.printSchema()

root
 |-- backoff: long (nullable = true)
 |-- has_more: boolean (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- accepted_answer_id: long (nullable = true)
 |    |    |-- answer_count: long (nullable = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- bounty_amount: long (nullable = true)
 |    |    |-- bounty_closes_date: long (nullable = true)
 |    |    |-- closed_date: long (nullable = true)
 |    |    |-- closed_reason: string (nullable = true)
 |    |    |-- community_owned_date: long (nullable = true)
 |    |    |-- content_license: string (nullable = true)
 |    |    |-- creation_date: long (nullable = true)
 |    |    |-- is_answered: boolean (nullable = true)
 |    |    |-- last_activity_date: long (nullable = true)
 |    |    |-- last_edit_date: long (nullable = true)
 |    |    |-- link: string (nullable = true)
 |    |    |-- locked_date: long (nullable = true)
 |    |    |-- migrated_fro

In [5]:
%%time
o = spark.read.format("json").load('/content/gdrive/My Drive/iss/Capstone/PLP Project/Dataset/data/o*txt')
o.printSchema()

root
 |-- backoff: long (nullable = true)
 |-- has_more: boolean (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- accepted_answer_id: long (nullable = true)
 |    |    |-- answer_count: long (nullable = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- bounty_amount: long (nullable = true)
 |    |    |-- bounty_closes_date: long (nullable = true)
 |    |    |-- closed_date: long (nullable = true)
 |    |    |-- closed_reason: string (nullable = true)
 |    |    |-- community_owned_date: long (nullable = true)
 |    |    |-- content_license: string (nullable = true)
 |    |    |-- creation_date: long (nullable = true)
 |    |    |-- is_answered: boolean (nullable = true)
 |    |    |-- last_activity_date: long (nullable = true)
 |    |    |-- last_edit_date: long (nullable = true)
 |    |    |-- link: string (nullable = true)
 |    |    |-- locked_date: long (nullable = true)
 |    |    |-- migrated_fro

## 5.1 Sample data
return csv - **qo.csv**


### 5.1.0 Set intervals

In [6]:
# set intervals, the larger the smaller the dataset
interval_q = 30
interval_o = 100 ## o was downloaded 1 out of 10 days from beginning, have to be in multiples of 10

### 5.1.1 Select columns

In [7]:
def select_req(df, lbl):
  """
  df: full data frame
  lbl: label = 0 if from others, 1 for questions (tagged python)  
  """

  tmp = df.withColumn('items', F.explode('items'))
  subtmp = tmp.select('items.question_id',                     
                      F.to_date(F.from_unixtime('items.creation_date',"yyyy-MM-dd")).alias('creation_date'),                     
                      F.from_unixtime('items.creation_date',"yyyy").alias('year'),
                      F.col('items.title').alias('title'),
                      F.col('items.body').alias('body')
                      )
  subtmp = subtmp.withColumn('label', F.lit(lbl))

  print(f'Original no. of rows == {subtmp.count()}')
  subtmp.printSchema()
  
  return subtmp

In [8]:
%%time
q_sub = select_req(q, 1)
o_sub = select_req(o, 0)

Original no. of rows == 878471
root
 |-- question_id: long (nullable = true)
 |-- creation_date: date (nullable = true)
 |-- year: string (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- label: integer (nullable = false)

Original no. of rows == 295605
root
 |-- question_id: long (nullable = true)
 |-- creation_date: date (nullable = true)
 |-- year: string (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- label: integer (nullable = false)

CPU times: user 329 ms, sys: 44.4 ms, total: 373 ms
Wall time: 56.9 s


### 5.1.2 Generate sample dates

In [9]:
def generate_series(start, stop, interval):
    # https://newbedev.com/sparksql-on-pyspark-how-to-generate-time-series
    """
    :param start  - lower bound, inclusive
    :param stop   - upper bound, exclusive
    :interval int - increment interval in seconds
    """
    
    # Determine start and stops in epoch seconds
    start, stop = spark.createDataFrame(
        [(start, stop)], ("start", "stop")
    ).select(
        [F.col(c).cast("timestamp").cast("long") for c in ("start", "stop")
    ]).first()
    # Create range with increments and cast to timestamp
    tmp = spark.range(start, stop, interval).select(
        F.col("id").cast("timestamp").cast("long").alias("value")
    )
    return tmp.select(F.to_date(F.from_unixtime('value',"yyyy-MM-dd")).alias('sample_date'),
                      F.lit('Y').alias('sample'))

In [10]:
sdq = generate_series('2018-01-01', '2021-07-04', 60*60*24*interval_q)
sdo = generate_series('2018-01-01', '2021-07-04', 60*60*24*interval_o)  

### 5.1.3 Select samples only

In [12]:
  # select only questions samples
  q_sample = q_sub.join(sdq, q_sub.creation_date == sdq.sample_date, 'left') \
              .select(q_sub['*'], sdq['sample'])                      
  q_sample = q_sample.select('*').where(F.col('sample') == 'Y')
  q_sample = q_sample.orderBy(F.col('question_id'))  
  print(q_sample.count())
  q_sample.printSchema()

  o_sample = o_sub.join(sdo, o_sub.creation_date == sdo.sample_date, 'left') \
              .select(o_sub['*'], sdo['sample'])                      
  o_sample = o_sample.select('*').where(F.col('sample') == 'Y')
  o_sample = o_sample.orderBy(F.col('question_id'))  
  print(o_sample.count())
  o_sample.printSchema()

29613
root
 |-- question_id: long (nullable = true)
 |-- creation_date: date (nullable = true)
 |-- year: string (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- label: integer (nullable = false)
 |-- sample: string (nullable = true)

29918
root
 |-- question_id: long (nullable = true)
 |-- creation_date: date (nullable = true)
 |-- year: string (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- label: integer (nullable = false)
 |-- sample: string (nullable = true)



### 5.1.4 Union data

In [13]:
qo_sample = q_sample.union(o_sample)
qo_sample = qo_sample.orderBy(F.col('question_id'))
print(qo_sample.count())

59531


### 5.1.5 Save data as csv

In [14]:
df = qo_sample.toPandas()
df.to_csv(folderpath + '/sample_data/qo.csv', index=False)

In [15]:
end = time.time()
print(end - start)
spark.stop()

1379.2419693470001
