# CSYE7245_DGA_Detection Assignment4

## Part C.B - MLLIB AND MACHINE LEARNING


### Course Information
* **School**: College of Engineering, Northeastern University
* **Course Name**: Big-Data Systems and Intelligence Analytics
* **Professor**: Nik Brown

### Group Members
- Haimin Zhang
- Lixi Zhou
- Shiqi Dai

### Requirements
Apply a MLlib and Machine Learning analysis of your project data.

In [2]:
import gc
import sys
import math
import collections
from tld import get_tld
import re
from publicsuffixlist import PublicSuffixList
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import IndexToString
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [3]:
# read csv and display csv
mixed_domains = sqlContext.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/mixed_domain.csv')
display(mixed_domains)
pandas_mixed_domains = mixed_domains.toPandas()
del mixed_domains
gc.collect()
# mixed_domain.printSchema()
# mixed_domain.describe().show()

DGA_family,Domain,Type
banjori,vrxhererwyatanb.com,DGA
banjori,rjdwmachuslazaroqok.com,DGA
none,mte.gov.br,Normal
none,animefangirl00.tumblr.com,Normal
none,seacenter.ir,Normal
Post,1xhkzo0vu7c96fwf07o1o9wjau.org,DGA
none,d-a-z.hr,Normal
none,asternic.net,Normal
rovnix,f8qlliz2qyitk5hmpl.biz,DGA
emotet,snpwmbhvvsnjoijr.eu,DGA


In [4]:

psl = PublicSuffixList()

# Load Valid Top Level Domains data
def load_topLevelDomain():
  topLevelDomain = []
  with open('/dbfs/FileStore/tables/tlds_alpha_by_domain-402ba.txt', 'r') as content:
      for line in content:
          topLevelDomain.append((line.strip('\n')))
  return topLevelDomain


def ignoreVPS(domain):
    # Return the rest of domain after ignoring the Valid Public Suffixes:
    validPublicSuffix = '.' + psl.publicsuffix(domain)
    if len(validPublicSuffix) < len(domain):
         # If it has VPS
        subString = domain[0: domain.index(validPublicSuffix)]  
    elif len(validPublicSuffix) == len(domain):
        return 0
    else:
        # If not
        subString = domain
    
    return subString

def typeTo_Binary(type):
  # Convert Type to Binary variable DGA = 1, Normal = 0
  if type == 'DGA':
    return 1
  else:
    return 0

def domain_length(domain):
  # Generate Domain Name Length (DNL)
  return len(domain)

def subdomains_number(domain):
  # Generate Number of Subdomains (NoS)
  subdomain = ignoreVPS(domain)
  return (subdomain.count('.') + 1)

def subdomain_length_mean(domain):
  # enerate Subdomain Length Mean (SLM) 
  subdomain = ignoreVPS(domain)
  result = (len(subdomain) - subdomain.count('.')) / (subdomain.count('.') + 1)
  return result

def has_www_prefix(domain):
  # Generate Has www Prefix (HwP)
  if domain.split('.')[0] == 'www':
    return 1
  else:
    return 0
  
def has_hvltd(domain):
  topLevelDomain = load_topLevelDomain()
  # Generate Has a Valid Top Level Domain (HVTLD)
  if domain.split('.')[len(domain.split('.')) - 1].upper() in topLevelDomain:
    return 1
  else:
    return 0
  
def contains_single_character_subdomain(domain):
  # Generate Contains Single-Character Subdomain (CSCS) 
  domain = ignoreVPS(domain)
  str_split = domain.split('.')
  minLength = len(str_split[0])
  for i in range(0, len(str_split) - 1):
      minLength = len(str_split[i]) if len(str_split[i]) < minLength else minLength
  if minLength == 1:
    return 1
  else:
    return 0

def contains_TLD_subdomain(domain):
  # Generate Contains TLD as Subdomain (CTS)
  subdomain = ignoreVPS(domain)
  str_split = subdomain.split('.')
  topLevelDomain = load_topLevelDomain()
  for i in range(0, len(str_split) - 1):
        if str_split[i].upper() in topLevelDomain:
            return 1
  return 0

def underscore_ratio(domain):
  # Generate Underscore Ratio (UR) on dataset
  subString = ignoreVPS(domain)
  result = subString.count('_') / (len(subString) - subString.count('.'))
  return result

def contains_IP_address(domain):
  # Generate Contains IP Address (CIPA) on datasetx
    splitSet = domain.split('.')
    for element in splitSet:
        if(re.match("\d+", element)) == None:
            return 0
    return 1  

def contains_digit(domain):
  """
   Contains Digits 
  """
  subdomain = ignoreVPS(domain)
  for item in subdomain:
    if item.isdigit():
      return 1
  return 0

def vowel_ratio(domain):
  """
  calculate Vowel Ratio 
  """
  VOWELS = set('aeiou')
  v_counter = 0
  a_counter = 0
  subdomain = ignoreVPS(domain)
  for item in subdomain:
    if item.isalpha():
      a_counter+=1
      if item in VOWELS:
        v_counter+=1
  if a_counter>1:
    ratio = v_counter/a_counter
    return ratio

def digit_ratio(domain):
  """
  calculate digit ratio
  """
  d_counter = 0
  counter = 0
  subdomain = ignoreVPS(domain)
  for item in subdomain:
    if item.isalpha() or item.isdigit():
      counter+=1
      if item.isdigit():
        d_counter+=1
  if counter>1:
    ratio = d_counter/counter
    return ratio
  
def prc_rrc(domain):
  """
  calculate the Ratio of Repeated Characters in a subdomain
  """
  subdomain = ignoreVPS(domain)
#   subdomain =''.join(re.findall('[a-zA-Z]+', subdomain)) 
  subdomain = re.sub("[.]", "", subdomain)
  char_num=0
  repeated_char_num=0
  d = collections.defaultdict(int)
  for c in list(subdomain):
      d[c] += 1
  for item in d:
    char_num +=1
    if d[item]>1:
      repeated_char_num +=1
  ratio = repeated_char_num/char_num
  return ratio

def prc_rcc(domain):
  """
  calculate the Ratio of Consecutive Consonants
  """
  VOWELS = set('aeiou')
  counter = 0
  cons_counter=0
  subdomain = ignoreVPS(domain)
#   subdomain =''.join(re.findall('[a-zA-Z]+', subdomain)) 
  for item in subdomain:
    i = 0
    if item.isalpha() and item not in VOWELS:
      counter+=1
    else:
      if counter>1:
        cons_counter+=counter
      counter=0
    i+=1
  if i==len(subdomain) and counter>1:
    cons_counter+=counter
  ratio = cons_counter/len(subdomain)
  return ratio

def prc_rcd(domain):
  """
  calculate the ratio of consecutive digits
  """
  counter = 0
  digit_counter=0
  subdomain = ignoreVPS(domain)
#   subdomain =''.join(re.findall('[a-zA-Z]+', subdomain)) 
  for item in subdomain:
    i = 0
    if item.isdigit():
      counter+=1
    else:
      if counter>1:
        digit_counter+=counter
      counter=0
    i+=1
  if i==len(subdomain) and counter>1:
    digit_counter+=counter
  ratio = digit_counter/len(subdomain)
  return ratio

def prc_entropy(domain):
    """
    calculate the entropy of subdomain
    :param domain_str: subdomain
    :return: the value of entropy
    """
    subdomain = ignoreVPS(domain)
    # get probability of chars in string
    prob = [float(subdomain.count(c)) / len(subdomain) for c in dict.fromkeys(list(subdomain))]

    # calculate the entropy
    entropy = - sum([p * math.log(p) / math.log(2.0) for p in prob])
    return entropy

In [5]:
def extract_features():
  pandas_mixed_domains['DNL'] = pandas_mixed_domains['Domain'].apply(lambda x: domain_length(x))
  pandas_mixed_domains['NoS'] = pandas_mixed_domains['Domain'].apply(lambda x: subdomains_number(x))
  pandas_mixed_domains['SLM'] = pandas_mixed_domains['Domain'].apply(lambda x: subdomain_length_mean(x))
  pandas_mixed_domains['HwP'] = pandas_mixed_domains['Domain'].apply(lambda x: has_www_prefix(x))
  pandas_mixed_domains['HVTLD'] = pandas_mixed_domains['Domain'].apply(lambda x: has_hvltd(x))
  pandas_mixed_domains['CSCS'] = pandas_mixed_domains['Domain'].apply(lambda x: contains_single_character_subdomain(x))
  pandas_mixed_domains['CTS'] = pandas_mixed_domains['Domain'].apply(lambda x: contains_TLD_subdomain(x))
  pandas_mixed_domains['UR'] = pandas_mixed_domains['Domain'].apply(lambda x: underscore_ratio(x))
  pandas_mixed_domains['CIPA'] = pandas_mixed_domains['Domain'].apply(lambda x: contains_IP_address(x))
  pandas_mixed_domains['contains_digit']= pandas_mixed_domains['Domain'].apply(lambda x:contains_digit(x))
  pandas_mixed_domains['vowel_ratio']= pandas_mixed_domains['Domain'].apply(lambda x:vowel_ratio(x))
  pandas_mixed_domains['digit_ratio']= pandas_mixed_domains['Domain'].apply(lambda x:digit_ratio(x))
  pandas_mixed_domains['RRC']= pandas_mixed_domains['Domain'].apply(lambda x:prc_rrc(x))
  pandas_mixed_domains['RCC']= pandas_mixed_domains['Domain'].apply(lambda x:prc_rcc(x))
  pandas_mixed_domains['RCD']= pandas_mixed_domains['Domain'].apply(lambda x:prc_rcd(x))
  pandas_mixed_domains['Entropy']= pandas_mixed_domains['Domain'].apply(lambda x:prc_entropy(x))
#   print(pandas_mixed_domains)

In [6]:
extract_features()
mixed_domains = spark.createDataFrame(pandas_mixed_domains)
mixed_domains =mixed_domains.drop('DGA_family')
del pandas_mixed_domains
gc.collect()
# mixed_domains.show()

In [7]:
# This part is to save  featured dataframe to csv file. When restarts again, users would not wait to the creation of featured data frame
# mixed_domains.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("/FileStore/tables/featured_domains6.csv")
# mixed_domains = sqlContext.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/featured_domains6.csv')
# mixed_domains.show()
# del mixed_domains2
# gc.collect()

In [8]:
def evaluate_models(predictions):
      # Select example rows to display.
      predictions.select("Domain","Type","features","predictedLabel").show(50)

      # Select (prediction, true label) and compute test error
      evaluator = MulticlassClassificationEvaluator(
          labelCol="indexed_type", predictionCol="prediction", metricName="accuracy")
      accuracy = evaluator.evaluate(predictions)
      print("Test Error = %g" % (1.0 - accuracy))

In [9]:
type_indexer = StringIndexer(inputCol="Type", outputCol="indexed_type").fit(mixed_domains)
indexed_type_domains = type_indexer.transform(mixed_domains)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = indexed_type_domains.randomSplit([0.7, 0.3])

# del mixed_domains
# gc.collect()

assembler = VectorAssembler(
    inputCols=["DNL","NoS","SLM","HwP","HVTLD","CSCS","CTS","UR","CIPA","Entropy", "RRC", "RCC","RCD","contains_digit","vowel_ratio","digit_ratio"],
    outputCol="features", handleInvalid = "skip")

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",labels=type_indexer.labels)


# Build Random forest classifier
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexed_type", featuresCol="features", numTrees=10)

# # # Chain indexers and forest in a Pipeline
rf_pipeline = Pipeline(stages=[assembler,rf, labelConverter])

# Use CV to choose the best hyper parameters
rf_paramGrid = ParamGridBuilder().addGrid(rf.maxDepth,[10, 15,20]).addGrid(rf.numTrees,[20,30]).build()

rf_evaluator = MulticlassClassificationEvaluator(labelCol="indexed_type", predictionCol="prediction", metricName="accuracy")

rf_crossval = CrossValidator(estimator=rf_pipeline,estimatorParamMaps=rf_paramGrid,evaluator=rf_evaluator,numFolds=3)  
# use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
rf_cvModel = rf_crossval.fit(trainingData)

# Make prediction with cv
rf_predictions = rf_cvModel.transform(testData)

# # # Train model.  This also runs the indexers.
# rf_model = rf_pipeline.fit(trainingData)

# # # Make predictions.
# rf_predictions = rf_model.transform(testData)
evaluate_models(rf_predictions)


In [10]:
# Gradient-boosted tree classifier

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexed_type", featuresCol="features", maxIter=10)

# Chain indexers and GBT in a Pipeline
gbt_pipeline = Pipeline(stages=[assembler,gbt, labelConverter])

# Use CV to choose the best hyper parameters
gbt_paramGrid = ParamGridBuilder().addGrid(gbt.maxDepth,[15, 20]).addGrid(gbt.maxIter,[20,30]).build()

gbt_evaluator = MulticlassClassificationEvaluator(labelCol="indexed_type", predictionCol="prediction", metricName="accuracy")

gbt_crossval = CrossValidator(estimator=gbt_pipeline,estimatorParamMaps=gbt_paramGrid,evaluator=gbt_evaluator,numFolds=3)  
# use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
gbt_cvModel = gbt_crossval.fit(trainingData)

# Train model.  This also runs the indexers.
# gbt_model = gbt_pipeline.fit(trainingData)

# Make predictions.
# gbt_predictions = gbt_model.transform(testData)
gbt_predictions = gbt_cvModel.transform(testData)
evaluate_models(gbt_predictions)

In [11]:
evaluate_models(gbt_predictions)

In [12]:
# Naive Bayes Classifer
nb = NaiveBayes(featuresCol='features', labelCol='indexed_type',smoothing=1.0, modelType="multinomial")

# Chain indexers and forest in a Pipeline
nb_pipeline = Pipeline(stages=[assembler,nb, labelConverter])

# Use CV to choose the best hyper parameters
nb_paramGrid = ParamGridBuilder().addGrid(nb.smoothing,[0.1,0.3,0.5]).build()

nb_evaluator = MulticlassClassificationEvaluator(labelCol="indexed_type", predictionCol="prediction", metricName="accuracy")

nb_crossval = CrossValidator(estimator=nb_pipeline,estimatorParamMaps=nb_paramGrid,evaluator=nb_evaluator,numFolds=3)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
nb_cvModel = nb_crossval.fit(trainingData)

# Train model.  This also runs the indexers.
# model = nb_pipeline.fit(trainingData)

# Make predictions.
# nb_predictions = model.transform(testData)
nb_predictions = nb_cvModel.transform(testData)

evaluate_models(nb_predictions)