In [1]:
import time
from datetime import timedelta

import html
import re

import os
import ntpath

import subprocess as sp

import numpy as np
import pandas as pd
import xml.etree.ElementTree as et

import dask
import dask.dataframe as dd
import dask.bag as bd
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

## Enter full path to CSV file

In [2]:
## Enter full path to CSV file

#common_path = '../../../../stackexchange_v2/workspace/input'
common_path = '../SO_data_dump'

## Enter Relative Path to csv file
relative_path = 'javaanswers_csv/JavaAnswers*.csv'

filepath_javaanswers = '{}/{}'.format(common_path, relative_path)

## Read Java Answers csv files 

In [3]:
ddf_javaanswers = dd.read_csv(filepath_javaanswers, engine='python', error_bad_lines=False, warn_bad_lines=False, dtype=object)

## Explore Posts and Java Posts

In [4]:
ddf_javaanswers

Unnamed: 0_level_0,Id,PostTypeId,AcceptedAnswerId,CreationDate,Score,ViewCount,Body,OwnerUserId,LastEditorUserId,LastEditorDisplayName,LastEditDate,LastActivityDate,Title,Tags,AnswerCount,CommentCount,FavoriteCount,CommunityOwnedDate,ContentLicense,ParentId,DeletionDate,OwnerDisplayName,ClosedDate
npartitions=9354,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1
,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [5]:
# Chech the number of partitions
ddf_javaanswers.npartitions

9354

In [6]:
ddf_javaanswers.columns

Index(['Id', 'PostTypeId', 'AcceptedAnswerId', 'CreationDate', 'Score',
       'ViewCount', 'Body', 'OwnerUserId', 'LastEditorUserId',
       'LastEditorDisplayName', 'LastEditDate', 'LastActivityDate', 'Title',
       'Tags', 'AnswerCount', 'CommentCount', 'FavoriteCount',
       'CommunityOwnedDate', 'ContentLicense', 'ParentId', 'DeletionDate',
       'OwnerDisplayName', 'ClosedDate'],
      dtype='object')

In [7]:
ddf_javaanswers = ddf_javaanswers.partitions[0:10]

In [8]:
#ddf_javaanswers.head(2)

## Start a Dask cluster using SLURM jobs as workers.

In [9]:
#http://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SLURMCluster.html
dask.config.set(
    {
        "distributed.worker.memory.target": False,  # avoid spilling to disk
        "distributed.worker.memory.spill": False,  # avoid spilling to disk
    }
)
cluster = SLURMCluster(
    cores=10, #cores=24, # we set each job to have 1 Worker, each using 10 cores (threads) and 8 GB of memory
    processes=2,
    memory="8GiB",
    walltime="0-30:30",# walltime="0-00:30",
    log_directory="../dask/logs",  # folder for SLURM logs for each worker
    local_directory="../dask",  # folder for workers data
)

Spawn between 20 to 100 workers and connect a client to be able use them.

In [10]:
#cluster.scale(n=20) # ask for 20 jobs or workers
# This also works with adaptive clusters. This automatically launches and kill workers based on load.
# we tell our cluster to autoscale between 10 and 20 workers depending on the load
cluster.adapt(minimum_jobs=50, maximum_jobs=200)
#cluster.adapt(maximum_jobs=20)
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://192.168.94.154:38114  Dashboard: http://192.168.94.154:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


## 1st Phase Computation 
- Join two columns Post 'Id' and 'ParentId' to PIdx
- Set 'PIdx' as the new index

In [11]:
ddf_javaanswers = ddf_javaanswers.reset_index()
ddf_javaanswers['PIdx'] = ddf_javaanswers['Id'].str.cat(ddf_javaanswers['ParentId'],sep="_")
#Join two columns 'index' to 'Id' and 'ParentId' 
ddf_javaanswers['Idx'] = ddf_javaanswers['PIdx'].str.cat(ddf_javaanswers['index'].astype(str),sep="_")
ddf_javaanswers = ddf_javaanswers.set_index('Idx')

## 2nd Phase Computation 
- Extract all the code form the answers
- result gives two options
    - 1st is enclsed in the code tags
    - 2nd collets the code from the code tags

In [12]:
ddf_javarawcode = ddf_javaanswers.Body.str.extractall(r'(<code>(.|\n|\r\n)*?<\/code>)')
#ddf_javarawcode = ddf_javaanswers.Body.str.extractall(r'(<code[^>]*>((?:.|\s)*?)</code>)')
#https://stackoverflow.com/questions/51212480/regex-for-mask-function-in-dask

#triggers the compute job but it will keep it on the works without retrieving resulte
#ddf_javarawcode = ddf_javarawcode.persist()

In [13]:
# rename column 0 to 'code_in_tags' and column 1 to 'Code'
ddf_javarawcode = ddf_javarawcode.rename(columns={0: 'Code', 1: 'Others'}) 

In [14]:
ddf_javarawcode.columns

Index(['Code', 'Others'], dtype='object')

In [15]:
# retrieve just the column 1 in a form of dataferam
ddf_javarawcode = ddf_javarawcode[['Code']].astype(str)

In [16]:
#https://stackoverflow.com/questions/60088353/convert-html-characters-to-strings-in-pandas-dataframe
#ddf_javarawcode = ddf_javarawcode.applymap(html.unescape)

In [17]:
ddf_javarawcode = ddf_javarawcode.reset_index() # to unstack the group by

In [18]:
#replace the <code> </code> Tags in the code column with empty string
ddf_javarawcode['Code'] = ddf_javarawcode.Code.str.replace(r'(<code>)|(<\/code>)', r'')

  out = getattr(getattr(obj, accessor, obj), attr)(*args, **kwargs)


## 4th Phase Computation 
- Check Lengths

In [19]:
#Get the Length of initial java answers
init_javaanswers_len = len(ddf_javaanswers.index) #####

In [20]:
#Get the Length after raw codes are extracted
javarawcode_len = len(ddf_javarawcode.index)

In [21]:
#Get the Number of Answers related to Java Post
print("Initial Number of Java Post: {}".format(init_javaanswers_len))

Initial Number of Java Post: 5800


In [22]:
#Get the Length after raw codes are extracted 
# because the regex extractall() gets multiple <code>...<\code> matches in just one post
# the javarawcode_len is expected to be more
print("Number after Java code after extraction: {}".format(javarawcode_len))

Number after Java code after extraction: 4284


## 5th Phase Computation 
- Save ddf_javarawcode into a csv file

### Make a folder in that directory

In [23]:
print(folder)

NameError: name 'folder' is not defined

### Save files in that directory

## Write the java codes into saparate files, and name each file according to the public class