In [None]:
'''
Analyzing imports for app-integration and their test code
'''

In [1]:
import os
from os.path import join as pjoin
import ast
import pandas as pd

In [2]:
from google_play_scraper import app
from bs4 import BeautifulSoup
import requests
from tqdm import tqdm
import pandas as pd

In [3]:
def getHtmlContent(url):
    response=requests.get(url)
    htmlContent=BeautifulSoup(response.content,"html.parser")
    return htmlContent

In [4]:
def count_methods_classes_lines(file_path):
    with open(file_path, 'r') as file:
        code = file.read()

    tree = ast.parse(code)

    # Initialize counters
    num_methods = 0
    num_classes = 0
    num_lines = len(code.splitlines())

    for node in ast.walk(tree):
        if isinstance(node, ast.FunctionDef):
            num_methods += 1
        elif isinstance(node, ast.ClassDef):
            num_classes += 1

    return num_methods, num_classes, num_lines

In [5]:
# def count_methods_classes_lines(file_path):
#     with open(file_path, 'r', encoding='utf-8') as file:
#         source_code = file.read()

#     tree = ast.parse(source_code)

#     class_count = 0
#     method_count = 0
#     line_count = len(source_code.splitlines())
#     async_method_count = 0

#     for node in ast.walk(tree):
#         if isinstance(node, ast.ClassDef):
#             class_count += 1

#         if isinstance(node, ast.FunctionDef):
#             method_count += 1
#             if any(isinstance(decorator, ast.Name) and decorator.id == 'async' for decorator in node.decorator_list):
#                 async_method_count += 1

#     return class_count, method_count, line_count, async_method_count

In [6]:
def countDict(input_list):
    from collections import defaultdict   
    # Create a defaultdict to count occurrences of each element
    count_dict = defaultdict(int)
    
    # Count occurrences of each element in the list
    for item in input_list:
        count_dict[item] += 1
    
    # Convert defaultdict to a regular dictionary
    result_dict = dict(count_dict)
    sorted_dict = {k: v for k, v in sorted(result_dict.items(), key=lambda item: item[1],reverse=True)}

    return sorted_dict

In [7]:
components_dir="/home/safwat/Documents/IoT_Testing/integration_samples/core/homeassistant/components"
test_components_dir="/home/safwat/Documents/IoT_Testing/integration_samples/core/tests/components"

apps=[app for app in os.listdir(components_dir) if "." not in app]
test_apps=[app for app in os.listdir(test_components_dir) if "." not in app]

In [8]:
# apps=[app for app in os.listdir(components_dir) if "." not in app]
# df=pd.DataFrame([],columns=["app","categories","num of files","file dist","python %","highest classes in a file","highest methods in a file",\
#                            "highest lines in a file","avg classes","avg methods","avg lines"])
# for app in tqdm(apps):
#     hiClasses, hiMethods, hiLines = 0,0,0
#     totalClasses, totalMethods, totalLines = 0,0,0
    
#     files=[os.path.relpath(os.path.join(root, file), pjoin(components_dir,app)) \
#            for root, _, files in os.walk(pjoin(components_dir,app)) for file in files]
#     extensions=[file.split(".")[-1] for file in files]
#     ext_count=countDict(extensions)
    
#     pyFiles=[file for file in files if file.endswith(".py")]
#     pyPerc=round(len(pyFiles)/len(files)*100,1)
#     soup = getHtmlContent(f"https://www.home-assistant.io/integrations/{app}")
#     for lab in soup.select("h1"):
#         if(lab.text=="Categories"):
#             cat=lab.find_next().text
#     categories=[c for c in cat.split("\n") if len(c)>0]
    
#     for pyFile in pyFiles:
#         class_count, method_count, num_lines = count_methods_classes_lines(pjoin(components_dir,app,pyFile))
#         totalClasses+=class_count
#         totalMethods+=method_count
#         totalLines+=num_lines
        
#         if class_count>hiClasses: hiClasses=class_count
#         if method_count>hiMethods: hiMethods=method_count
#         if num_lines>hiLines: hiLines=num_lines
#     if len(pyFiles)>0:
#         denom=len(pyFiles)
#     else:
#         denom=1
#     avgClasses=round(totalClasses/denom)
#     avgMethods=round(totalMethods/denom)
#     avgLines=round(totalLines/denom)
#     df.loc[len(df)]=[app,categories,len(files),ext_count,pyPerc,hiClasses,hiMethods,hiLines,avgClasses,avgMethods,avgLines]

In [9]:
# df.head(10)

In [9]:
# df.to_csv("integration_test_codes_analysis.csv")

In [27]:
def testCodeAnalysis(codeFolder, testFolder):
    codeFiles=[]
    for root,dir,files in os.walk(codeFolder):
        for file in files:
            if file.endswith(".py"):
                codeFiles.append((root,file))
    testCodeFiles=[]
    for root,dir,files in os.walk(testFolder):
        for file in files:
            if file.endswith(".py"):
                testCodeFiles.append((root,file))

    matchedTests=[]
    for cRoot,cfile in codeFiles:
        for tRoot, tfile in testCodeFiles:
            if cfile in tfile and "test" in tfile:
                matchedTests.append((pjoin(cRoot,cfile),pjoin(tRoot,tfile)))
                break
            
    return matchedTests,codeFiles, testCodeFiles

In [28]:
import ast
from collections import namedtuple

Import = namedtuple("Import", ["module", "name", "alias"])

def test_imports_list(code):
    testImports=[]
    def get_imports(code):
        with open(code) as fh:        
           root = ast.parse(fh.read(), code)
    
        for node in ast.iter_child_nodes(root):
            if isinstance(node, ast.Import):
                module = []
            elif isinstance(node, ast.ImportFrom):
                if node.module is not None:
                    module = node.module.split('.')
            else:
                continue
    
            for n in node.names:
                yield Import(module, n.name.split('.'), n.asname)

    for i in get_imports(code):
        for name in i.name:
            if "test" in name and name not in testImports:
                testImports.append(name)
        for module in i.module:
            if "test" in module and module not in testImports:
                testImports.append(module)
    return testImports



In [29]:
test_imports_list(pjoin(test_components_dir,"acmeda","test_config_flow.py"))

['unittest', 'pytest', 'tests']

In [30]:
def get_project_imports(project_folder):
    totalImports=[]
    for root,dir,files in os.walk(project_folder):
        for file in files:
            if file.endswith(".py"):
                testImports=test_imports_list(pjoin(root,file))
                for ti in testImports:
                    if ti not in totalImports: totalImports.append(ti)
    return totalImports
                

In [31]:
def get_filelist_imports(filelist):
    totalImports=[]
    for file in filelist:
        if file.endswith(".py"):
            testImports=test_imports_list(pjoin(file))
            for ti in testImports:
                if ti not in totalImports: totalImports.append(ti)
    return totalImports

In [32]:
testCodeDF=pd.DataFrame([],columns=["app","test_code_exists","test_code%","python_test_imports"])
for app in tqdm(apps):
    if not os.path.exists(pjoin(test_components_dir,app)):
        testCodeDF.loc[len(testCodeDF)]=[app,"False","N/A","N/A"]
    else:
        codeFolder=pjoin(components_dir,app)
        testCodeFolder=pjoin(test_components_dir,app)
        matchedTests,codeFiles, testCodeFiles=testCodeAnalysis(codeFolder, testCodeFolder)
        tcPerc=round(len(matchedTests)/len(codeFiles)*100,1)
        
        testCodeFiles= [t[1] for t in matchedTests]
        totalImports=get_filelist_imports(testCodeFiles)
        totalImportsString=", ".join(totalImports)
        testCodeDF.loc[len(testCodeDF)]=[app,"True",tcPerc,totalImportsString]
            
#Add code for calculating local imports
localImports=get_project_imports("test_local_imports")
localImportsString=", ".join(localImports)
testCodeDF.loc[len(testCodeDF)]=["local_imports","True","N/A",localImportsString]

100%|██████████████████████████████████████| 1158/1158 [00:02<00:00, 467.78it/s]


In [33]:
len(apps)

1158

In [34]:
len(test_apps)

791

In [35]:
len(testCodeDF)

1159

In [36]:
testCodeDF.to_csv("test_analysis.csv")

In [37]:
importDict={}
allTestImports=testCodeDF["python_test_imports"].tolist()
for impString in allTestImports:
    imps=[imp.strip() for imp in impString.split(",")]
    for imp in imps:
        if imp=="":
            continue
        try:
            importDict[imp]+=1
        except:
            importDict[imp]=1
        

In [38]:
sorted_importDict = dict(sorted(importDict.items(), key=lambda item: item[1],reverse=True))
sorted_importDict

{'unittest': 629,
 'tests': 626,
 'pytest': 471,
 'N/A': 368,
 'test_util': 77,
 'conftest': 65,
 'pytest_unordered': 39,
 'test_utils': 4,
 'test_common': 4,
 'get_test_home_assistant': 3,
 'async_test_home_assistant': 3,
 'test_init': 3,
 'test_media_player': 2,
 'testing_config': 2,
 'test': 2,
 'register_test_entity': 2,
 'help_test_availability_when_connection_lost': 2,
 'help_test_discovery_removal': 2,
 'help_test_discovery_update_unchanged': 2,
 'help_test_entity_id_update_discovery_update': 2,
 'help_test_entity_id_update_subscriptions': 2,
 'testing': 2,
 'entity_test_helpers': 1,
 'test_sensor': 1,
 'test_auth_active_with_token': 1,
 'test_auth': 1,
 'get_test_instance_port': 1,
 'create_rfx_test_cfg': 1,
 'test_controller': 1,
 'help_test_entity_available': 1,
 '_test_sensors': 1,
 'test_gateway': 1,
 'create_engine_test': 1,
 'create_engine_test_for_schema_version_postfix': 1,
 'get_latest_short_term_statistics': 1,
 'test_issues': 1,
 'test_ll_hls': 1,
 'test_hls': 1,
 't

In [42]:
testImportCountDF = pd.DataFrame(list(sorted_importDict.items()), columns=['import', 'count'])
testImportCountDF.to_csv("test_import_counts.csv", index=False)