In [1]:
import random
import re

from faker import Faker
from faker.providers import phone_number

In [2]:
# Constants

MINUTE_CONSTRAINT = 2
N_CIRCLES = 20
CALLS_CONSTRAINT = 2

In [4]:
# !python -m pip install Faker

# Нормализация номера

Буду приводить номера к виду: **1234567890**, без дефисов, точек и тп, потому что это лишняя информация, которая никак нам не поможет, нам важны только цифры.

In [6]:
def normalize_phone_number(number: str) -> str:
    number = number.split('x')[0]

    digits = [i for i in number if i.isdigit()][-10:]

    return ''.join(digits)

## Протестим нормализацию на нескольких номерах:

In [14]:
numbers = [
    '(031)381-2655',
    '519-336-8985x6623',
    '+1-885-895-5842x47265',
    '9255880772',
    '001-866-784-8782x84277',
    '+1-540-961-2027x36866',
    '784-137-2019',
    '(419)148-4019x156',
]

normalized_numbers = [
    '0313812655',
    '5193368985',
    '8858955842',
    '9255880772',
    '8667848782',
    '5409612027',
    '7841372019',
    '4191484019',
]

for i in range(len(numbers)):
    print(f"Проверяется номер: {numbers[i]}")
    assert normalize_phone_number(numbers[i]) == normalized_numbers[i]

print()
print("Все тесты прошли успешно!")

Проверяется номер: (031)381-2655
Проверяется номер: 519-336-8985x6623
Проверяется номер: +1-885-895-5842x47265
Проверяется номер: 9255880772
Проверяется номер: 001-866-784-8782x84277
Проверяется номер: +1-540-961-2027x36866
Проверяется номер: 784-137-2019
Проверяется номер: (419)148-4019x156

Все тесты прошли успешно!


In [15]:
with open('Control_point_2/numbers.txt', 'r') as file1, open('Control_point_2/numbers_clean.txt', 'w') as file2:
    while True:
        number = file1.readline()

        if not number:
            break

        file2.write(normalize_phone_number(number) + '\n')

# Пополним список подозреваемых:

In [37]:
fake = Faker()
fake.add_provider(phone_number)
pnumbers = [fake.phone_number() for i in range(100)]
calls = [ {"caller": random.choice(pnumbers), "recipient": random.choice(pnumbers), "duration_s": random.randint(5, 600)} for i in range(1500) ]

suspects = random.choices(pnumbers,k=10)
print(suspects)

with open("Control_point_2/numbers.txt", "w+") as file:
    file.writelines([f'{str(number)}\n' for number in pnumbers])

with open("Control_point_2/suspects.txt", "w+") as file:
    file.writelines([f'{str(number)}\n' for number in suspects])

with open("Control_point_2/calls.txt", "w+") as file:
        file.writelines([f'caller:{call.get("caller")}|recipient:{call.get("recipient")}|duration_s:{call.get("duration_s")}\n' for call in calls])


['167.986.4800', '(002)317-7838x37908', '202-533-5657x831', '703-493-3046', '+1-432-997-7653x5062', '435-569-6100x137', '(965)145-4222', '+1-032-989-9674x248', '(598)120-7013', '938.145.4062x95941']


In [38]:
suspects_0 = set()

with open('Control_point_2/suspects.txt', 'r') as file:
    numbers = [i[:-1] for i in file.readlines()]

for num in numbers:
    suspects_0.add(normalize_phone_number(num))

In [39]:
suspects_0

{'0023177838',
 '0329899674',
 '1679864800',
 '2025335657',
 '4329977653',
 '4355696100',
 '5981207013',
 '7034933046',
 '9381454062',
 '9651454222'}

In [40]:
def check_suspects(suspects: set, caller: str, recipient: str, duration_s: str, minute_constraint=2) -> bool:
    if recipient in suspects and duration_s / 60 > minute_constraint:
        return True
    return False

Решил не использовать функцию выше, так как при её использовании нужно будет передавать множество suspects, которое потенциально может быть очень большим. Такие лишние копирование большой области памяти нам не нужны. 

In [44]:
suspects_1 = set()

with open('Control_point_2/calls.txt', 'r') as file, open('Control_point_2/proofs.txt', 'w') as file_proofs:
    while True:
        calls = {}

        line = file.readline()
        if not line:
            break

        line = line.split('|')

        calls['caller'] = normalize_phone_number(line[0])
        calls['recipient'] = normalize_phone_number(line[1])
        calls['duration_s'] = int(line[2][11:])

        if calls['recipient'] in suspects_0 and calls['duration_s'] / 60 > MINUTE_CONSTRAINT:
            suspects_1.add(calls['caller'])

            file_proofs.write(f"caller:{calls['caller']}|recipient:{calls['recipient']}|duration_s:{calls['duration_s']}\n")


In [42]:
suspects_1

{'0023177838',
 '0127356191',
 '0288118381',
 '0323121165',
 '0329899674',
 '0753678659',
 '1129320710',
 '1468745897',
 '1548641885',
 '1630458536',
 '1745182453',
 '1909221679',
 '2044000781',
 '2204703803',
 '2206256733',
 '2250351174',
 '2331029737',
 '2391774938',
 '2483967113',
 '2574070582',
 '2779248991',
 '2946238545',
 '3197457978',
 '3334976243',
 '3385163024',
 '3427192045',
 '3448303234',
 '3477383805',
 '3601932743',
 '3810326337',
 '3824159156',
 '4007090986',
 '4038110115',
 '4070958602',
 '4300831611',
 '4568168910',
 '4576049948',
 '4641808446',
 '4846304359',
 '4911789965',
 '4998601850',
 '5132356084',
 '5422833844',
 '5454798785',
 '5486455746',
 '5594661910',
 '5754016616',
 '6039158482',
 '6201197516',
 '6416632580',
 '6619666659',
 '7015748983',
 '7024025401',
 '7361373810',
 '7725076526',
 '7926092288',
 '8189304588',
 '8196212340',
 '8205227784',
 '8397312683',
 '8637267475',
 '8855487747',
 '9260444619',
 '9528157011',
 '9651454222',
 '9777474531'}

In [43]:
with open('Control_point_2/suspects.txt', 'a') as file:
    for num in suspects_1:
        file.write(num + '\n')

# Теперь попробуем пополнить список звонков и сделать N-e круги.

In [47]:
fake = Faker()
fake.add_provider(phone_number)
pnumbers = [fake.phone_number() for i in range(300)]
pnumbers = [normalize_phone_number(str(number)) for number in pnumbers]

calls = [ {"caller": random.choice(pnumbers), "recipient": random.choice(pnumbers), "duration_s": random.randint(5, 600)} for i in range(15000) ]

suspects = random.choices(pnumbers,k=30)
print(suspects)

with open("Control_point_3/numbers.txt", "w+") as file:
    file.writelines([f'{str(number)}\n' for number in pnumbers])

with open("Control_point_3/suspects.txt", "w+") as file:
    file.writelines([f'{str(number)}\n' for number in suspects])

with open("Control_point_3/calls.txt", "w+") as file:
    file.writelines([f'caller:{call.get("caller")}|recipient:{call.get("recipient")}|duration_s:{call.get("duration_s")}\n' for call in calls])


['4511649208', '8752003827', '6755016690', '6964912789', '0302704296', '0230964277', '3538068831', '0628854739', '4511649208', '1735302177', '6326319211', '2297090567', '0628854739', '7364040387', '6964912789', '4297384496', '1985308524', '4883094484', '7291158963', '8722628228', '7419256086', '2405762858', '0005642086', '9261357652', '2706051168', '6259100086', '3457203657', '2959805031', '3761800778', '5717766958']


Прочитаем текущих подозреваемых:

In [48]:
suspects_0 = set()

with open('Control_point_3/suspects.txt', 'r') as file:
    numbers = [i[:-1] for i in file.readlines()]

for number in numbers:
    suspects_0.add(number)

In [49]:
suspects_0

{'0005642086',
 '0230964277',
 '0302704296',
 '0628854739',
 '1735302177',
 '1985308524',
 '2297090567',
 '2405762858',
 '2706051168',
 '2959805031',
 '3457203657',
 '3538068831',
 '3761800778',
 '4297384496',
 '4511649208',
 '4883094484',
 '5717766958',
 '6259100086',
 '6326319211',
 '6755016690',
 '6964912789',
 '7291158963',
 '7364040387',
 '7419256086',
 '8722628228',
 '8752003827',
 '9261357652'}

In [50]:
n = 1

prev_suspects = suspects_0

while n <= N_CIRCLES:
    with open('Control_point_3/calls.txt', 'r') as file_calls, \
         open('Control_point_3/suspects.txt', 'a') as file_suspects, \
         open('Control_point_3/proofs.txt', 'w') as file_proofs:
        curr_suspects = set()

        while True:
            calls = {}

            line = file_calls.readline()
            if not line:
                break

            line = line.split('|')

            calls['caller'] = line[0].split('caller:')[1]
            calls['recipient'] = line[1].split('recipient:')[1]
            calls['duration_s'] = int(line[2].split('duration_s:')[1])

            if calls['recipient'] in prev_suspects and calls['duration_s'] / 60 > MINUTE_CONSTRAINT:
                curr_suspects.add(calls['caller'])

                file_proofs.write(f"caller:{calls['caller']}|recipient:{calls['recipient']}|duration_s:{calls['duration_s']}\n")

        if not curr_suspects:
            break
        
        file_suspects.write("New circle" + '\n')
        file_suspects.write(f"-----------{n}-circle of suspects:-----------" + '\n')
        for number in curr_suspects:
            file_suspects.write(number + '\n')
            
        
        prev_suspects = curr_suspects
        n += 1

In [51]:
print(f"We reached {n-1} circles of suspects.")

We reached 20 circles of suspects.


### Как валидировать файл Control_point_3/suspects.txt?

Его можно валидировать по строке **New circle**. Например:

In [52]:
with open('Control_point_3/suspects.txt', 'r') as file:
    while True:
        number = file.readline()

        if not number:
            break
        elif number == "New circle\n":
            circle = file.readline()

            search = re.search(r'\d+', circle)
            n = int(search.group(0))
            print(f"Processing circle {n} of suspects.")

            continue

Processing circle 1 of suspects.
Processing circle 2 of suspects.
Processing circle 3 of suspects.
Processing circle 4 of suspects.
Processing circle 5 of suspects.
Processing circle 6 of suspects.
Processing circle 7 of suspects.
Processing circle 8 of suspects.
Processing circle 9 of suspects.
Processing circle 10 of suspects.
Processing circle 11 of suspects.
Processing circle 12 of suspects.
Processing circle 13 of suspects.
Processing circle 14 of suspects.
Processing circle 15 of suspects.
Processing circle 16 of suspects.
Processing circle 17 of suspects.
Processing circle 18 of suspects.
Processing circle 19 of suspects.
Processing circle 20 of suspects.


# Усложним правила отбора подозреваемых.

In [53]:
# Constants

MINUTE_CONSTRAINT = 2
N_CIRCLES = 20
CALLS_CONSTRAINT = 3

In [54]:
fake = Faker()
fake.add_provider(phone_number)
pnumbers = [fake.phone_number() for i in range(20)]
pnumbers = [normalize_phone_number(str(number)) for number in pnumbers]

calls = [ {"caller": random.choice(pnumbers), "recipient": random.choice(pnumbers), "duration_s": random.randint(50, 600)} for i in range(50000) ]

suspects = random.choices(pnumbers,k=10)
print(suspects)

with open("Control_point_4/numbers.txt", "w+") as file:
    file.writelines([f'{str(number)}\n' for number in pnumbers])

with open("Control_point_4/suspects.txt", "w+") as file:
    file.writelines([f'{str(number)}\n' for number in suspects])

with open("Control_point_4/calls.txt", "w+") as file:
        file.writelines([f'caller:{call.get("caller")}|recipient:{call.get("recipient")}|duration_s:{call.get("duration_s")}\n' for call in calls])


['1053306531', '0558987462', '7016263283', '7723765891', '4210734819', '1920264448', '4030277152', '0052313714', '4214517778', '6083274860']


In [55]:
prev_suspects = set()

with open('Control_point_4/suspects.txt', 'r') as file:
    numbers = [i[:-1] for i in file.readlines()]

for number in numbers:
    prev_suspects.add(number)

In [56]:
n = 1

while n <= N_CIRCLES:
    with open('Control_point_4/calls.txt', 'r') as file_calls, \
         open('Control_point_4/suspects.txt', 'a') as file_suspects, \
         open('Control_point_4/proofs.txt', 'w') as file_proofs:
        curr_suspects = set()

        while True:
            calls = {}
            supposed_suspects = {}

            line = file_calls.readline()
            if not line:
                break

            line = line.split('|')

            calls['caller'] = line[0].split('caller:')[1]
            calls['recipient'] = line[1].split('recipient:')[1]
            calls['duration_s'] = int(line[2].split('duration_s:')[1])

            if calls['recipient'] in prev_suspects and calls['duration_s'] / 60 > MINUTE_CONSTRAINT:
                if calls['caller'] in supposed_suspects:
                    supposed_suspects[calls['caller']] += 1
                else:
                    supposed_suspects[calls['caller']] = 1
                
                if supposed_suspects[calls['caller']] == CALLS_CONSTRAINT:
                    curr_suspects.add(calls['caller'])

                    file_proofs.write(f"caller:{calls['caller']}|recipient:{calls['recipient']}|duration_s:{calls['duration_s']}\n")

        if not curr_suspects:
            break
        
        file_suspects.write("New circle" + '\n')
        file_suspects.write(f"-----------{n}-circle of suspects:-----------" + '\n')
        for number in curr_suspects:
            file_suspects.write(number + '\n')
        
        prev_suspects = curr_suspects
        n += 1

In [57]:
print(f"We reached {n-1} circles of suspects.")

We reached 0 circles of suspects.
