In [1]:
import time
from datetime import timedelta

import html
import re

import numpy as np
import pandas as pd
import xml.etree.ElementTree as et

import dask
import dask.dataframe as dd
import dask.bag as bd
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

In [2]:
##########################################################
#Step 1. Format 'pmd_rules_results.xml' from Linux terminal#
##########################################################
# -'pmd_rules_results.xml' is not properly formated
# -We used linux format command to format 'pmd_rules_results.xml' to save it in 'pmd_rules_results_fmt.xml'
# $  xmllint --format pmd_rules_results.xml > pmd_rules_results_fmt.xml
##############################################################################
#Step 2. Remove some lines in 'test_flaws_rem_sm_tags_fmt' from Linux terminal#
##############################################################################
# remove lines 1 to 2 and the last line of the 'pmd_rules_results_fmt.xml'
# MacOS $  sed -i '' '1,2d;$d' pmd_rules_results_fmt.xml 
# Linux  $  ZA               
# blocksize breaks it into partitions
#pmd_rules_result_bd = bd.read_text('pmd_rules_results_fmt.xml', blocksize='10MB')
#Use '</violation>\n' as linedelimiter
pmd_rules_result_bd = bd.read_text('pmd_rules_results_fmt.xml', blocksize='5MB', linedelimiter='</violation>\n')
#pmd_rules_result_bd = bd.read_text('pmd_rules_results_fmt.xml', blocksize=1e8, linedelimiter='</violation>\n')
#pmd_rules_result_bd = bd.read_text('pmd_rules_results_fmt.xml', blocksize=None, linedelimiter='</violation>\n')
#pmd_rules_result_bd = bd.read_text('pmd_rules_results.xml', linedelimiter='</violation>\n')

In [3]:
# check to see number of partitions
# find the number of partitions
#pmd_rules_result_bg.npartitions # or
pmd_rules_result_bd

dask.bag<bag-from-delayed, npartitions=1116>

In [4]:
# look at the first row
pmd_rules_result_bd.take(1)



In [5]:
# look at the first 50 row
#pmd_rules_result_bd.take(50)

### We are intrested in getting the 'violation'
- so we can get rid of the files
- then filter

In [6]:
# replace all the '<file>' with '</file>' so that we can access it using xml.etree.ElementTree
violations_rows_bd = pmd_rules_result_bd.map(lambda line: re.sub(r'\s*<\s*file.*>|.*/file>', '', line))

In [7]:
# look at the first row
violations_rows_bd.take(1)



In [8]:
# look at the first 50 row
#violations_rows_bd.take(50)

In [9]:
#filter only <violation ...
# Regex can also be applied here
#violations_rows_top_bd = violations_rows_bd.filter(lambda line: re.findall(r'\s*<violation', line))
violations_rows_bd = violations_rows_bd.filter(lambda line: line.find('<violation' or '< violation') >= 0)

In [10]:
# look at the first row
violations_rows_bd.take(1)



In [11]:
# look at the first 50 row
#violations_rows_bd.take(50)

### Use ElementTree to get all the attrbutes and text in xmlor html tags

In [12]:
# This will give us just the attributes in the violation tag: <violation ...>
#violations_rows_bd = violations_rows_bd.map(lambda row: et.fromstring(row).attrib)

In [13]:
# This will give us just the text enclosed in the violation tag: <violation ...>
#violations_rows_bd = violations_rows_bd.map(lambda row: {'text':et.fromstring(row).text})

- To get both the the attrbutes and text in the violation tag 
    - We use add_text_to_dict_attrb() to achieve that

In [14]:
def add_text_to_dict_attrb(dictionary, txt):
    txt = txt.strip('\n')
    dictionary['text'] = txt
    return dictionary

In [15]:
# This will give us both the attributes in the violation tag and the text enclosed in the violation tag:
# <violation ...> ... </violation>
violations_rows_bd = violations_rows_bd.map(lambda row:  add_text_to_dict_attrb(et.fromstring(row).attrib, et.fromstring(row).text))


In [16]:
# look at the first row
violations_rows_bd.take(1)

({'beginline': '1',
  'endline': '11',
  'begincolumn': '43',
  'endcolumn': '1',
  'rule': 'UseUtilityClass',
  'ruleset': 'Design',
  'class': 'Code_10000096_9914015_1959_0',
  'externalInfoUrl': 'https://pmd.github.io/pmd-6.39.0/pmd_rules_java_design.html#useutilityclass',
  'priority': '3',

In [17]:
# look at the first 50 row
#violations_rows_bd.take(50)

In [18]:
# further processing can be done to each item
# e.g. to remove 'beginline' that do not have anything in them
#violations_rows_bd = violations_rows_bd.filter(lambda x: x['beginline'] != '')

In [19]:
violations_rows_bd

dask.bag<lambda, npartitions=1116>

## Start a Dask cluster using SLURM jobs as workers.

There are a couple of things we need to configure here:

disabling the mechanism to write on disk when workers run out of memory,
memory, CPUs, maximum time and number of workers per SLURM job,
dask folders for log files and workers data.
We recommend putting the log folder and workers data folders in your /nesi/nobackup/<project_code> folder, most indicated for temporary files (see NeSI File Systems and Quotas).

All of these options can be set in configuration files, see Dask configuration and Dask jobqueue configuration for more information.

In [20]:
#http://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SLURMCluster.html
dask.config.set(
    {
        "distributed.worker.memory.target": False,  # avoid spilling to disk
        "distributed.worker.memory.spill": False,  # avoid spilling to disk
    }
)
cluster = SLURMCluster(
    cores=10, #cores=24, # we set each job to have 1 Worker, each using 10 cores (threads) and 8 GB of memory
    processes=2,
    memory="8GiB",
    walltime="0-30:30",# walltime="0-00:30",
    log_directory="../dask/logs",  # folder for SLURM logs for each worker
    local_directory="../dask",  # folder for workers data
)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 32897 instead


In [21]:
#cluster.adapt(minimum_jobs=20, maximum_jobs=200)
cluster.adapt(minimum_jobs=10, maximum_jobs=200)
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://192.168.94.163:41773  Dashboard: http://192.168.94.163:32897/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


## Removing Empty partitions from the dataframe
- Challenge is that Some empty partitions are created from the Bags
    - This may be as a result of some of the preprocessing stapes carried out in the Bag
    - e.g filtering
    - hence the workers freezes at some point because it can not handle empty pattitions
    - One way I tried to solve his is to reade the partitions one by one and write to csv file thoes partitions that are not empty
        - See s1_read_pmdruleresultxml_get_violations_v2
    - Also Try this solution here
  https://stackoverflow.com/questions/47812785/remove-empty-partitions-in-dask 
  - Also checkout: https://stackoverflow.com/questions/61670990/dask-dataframe-filter-and-repartition-gives-some-empty-partitions?rq=1

In [22]:
# Gets Rid of the empty partitions in Bags
def cull_empty_partitions(bag):
    """
    When bags are created by filtering or grouping from a different bag,
    it retains the original bag's partition count, even if a lot of the
    partitions become empty.
    Those extra partitions add overhead, so it's nice to discard them.
    This function drops the empty partitions.
    """
    bag = bag.persist()
    def get_len(partition):
        # If the bag is the result of bag.filter(),
        # then each partition is actually a 'filter' object,
        # which has no __len__.
        # In that case, we must convert it to a list first.
        if hasattr(partition, '__len__'):
            return len(partition)
        return len(list(partition))
    partition_lengths = bag.map_partitions(get_len).compute()

    # Convert bag partitions into a list of 'delayed' objects
    lengths_and_partitions = zip(partition_lengths, bag.to_delayed())

    # Drop the ones with empty partitions
    partitions = (p for l,p in lengths_and_partitions if l > 0)

    # Convert from list of delayed objects back into a Bag.
    return dask.bag.from_delayed(partitions)

In [23]:
violations_rows_bd = cull_empty_partitions(violations_rows_bd)

KilledWorker: ("('bag-from-delayed-decode-filter-lambda-read-block-lambda-5eda3f47f3cdec7f1cfb2b0ff274e7c3', 406)", <Worker 'tcp://10.65.4.49:32853', name: 55-1, memory: 0, processing: 1>)

In [None]:
violations_rows_bd

## Convert Bags to Dataframes

In [None]:
#df = violations_rows_bd.to_dataframe()
df = violations_rows_bd.to_dataframe(meta={'beginline':'int', 'endline':'int', 'begincolumn':'int', 
                                               'endcolumn':'int', 'rule':'str', 'ruleset':'str',
                                               'class':'str', 'externalInfoUrl':'str', 'priority':'int', 'text':'str'})

In [None]:
# View Stucture
df

In [None]:
#df.partitions[1].compute()

In [None]:
df.columns

In [None]:
#Index will not be particularly meaningful. Use reindex afterwards if necessary.
df.index

In [None]:
df.dtypes

In [None]:
df.npartitions

## Removing Empty partitions from the dataframe

In [None]:
# Repartition
def _rebalance_ddf(ddf):
    """Repartition dask dataframe to ensure that partitions are roughly equal size.

    Assumes `ddf.index` is already sorted.
    """
    if not ddf.known_divisions:  # e.g. for read_parquet(..., infer_divisions=False)
        ddf = ddf.reset_index().set_index(ddf.index.name, sorted=True)
    index_counts = ddf.map_partitions(lambda _df: _df.index.value_counts().sort_index()).compute()
    index = np.repeat(index_counts.index, index_counts.values)
    divisions, _ = dd.io.io.sorted_division_locations(index, npartitions=ddf.npartitions)
    return ddf.repartition(divisions=divisions)

In [None]:
# remove empty partitions 
#df = cull_empty_partitions(df) # remove empties

In [None]:
df = _rebalance_ddf(df)       # re-size

In [None]:
df

In [None]:
#if not df.empty:
df.shape[0].compute()

In [None]:
#replace new lines with empty strings
#df = df.replace('\\n','', regex=True)

In [None]:
#df.shape[0].compute()

In [None]:
# drop empty columns
#df = df[~df['class'].isna()] # Drop rows that have NaN in the Code column

In [None]:
#df.shape[0].compute()

In [None]:
# drop classes that do not have this pattern
#df = df[df['class'].str.contains('^Code_\d+_\d+_\d+_\d+', regex=True)]

In [None]:
# select multiple columns
#df1 = df[['beginline', 'endline', 'begincolumn', 'endcolumn', 'rule', 'ruleset','class', 'externalInfoUrl', 'priority', 'text']]
#df = df[['class', 'rule', 'ruleset', 'text']]

In [None]:
#df.shape[0].compute()

In [None]:
#grp_df = df.groupby(['class', 'rule', ])

In [None]:
#grp_df

In [None]:
# Let's print the first entries
# in all the groups formed.
#grp_df.first().compute()

In [None]:
# List all the group keys
#list(grp_df.groups.keys())

## Save all the codes from the posts into a CSV file

In [None]:
## Save all the codes from the posts into a JSON file
#violations_rows_bd.to_textfiles('pmdcodesnippets_json/PMDJavaCodeSnippets*.json.gz', , encoding='utf-8')
#violations_rows_bd.to_textfiles('pmdcodesnippets_json/PMDJavaCodeSnippets*.json', encoding='utf-8')
## Save all the codes from the posts into a CSV file
# Save to a CSV file
#df.to_csv('pmdcodesnippetsviolation_csv/PMDJavaCodeSnippetsViolation*.csv', sep=',', index=False)

distributed.utils - ERROR - Timed out trying to connect to tcp://10.65.4.49:32853 after 10 s
Traceback (most recent call last):
  File "/opt/nesi/CS400_centos7_bdw/Python/3.9.5-gimkl-2020a/lib/python3.9/site-packages/distributed/comm/tcp.py", line 379, in connect
    stream = await self.client.connect(
  File "/opt/nesi/CS400_centos7_bdw/Python/3.9.5-gimkl-2020a/lib/python3.9/site-packages/tornado/tcpclient.py", line 275, in connect
    af, addr, stream = await connector.start(connect_timeout=timeout)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/nesi/CS400_centos7_bdw/Python/3.9.5-gimkl-2020a/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/nesi/CS400_centos7_bdw/Python/3.9.5-gimkl-2020a/lib/python3.9/site-p